kafka重复消费(kafka重复消费问题解决)

本篇文章给大家谈谈kafka重复消费,以及kafka重复消费问题解决对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka消费异常处理

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

两次poll()的时间间隔大此乎枣森拆于配置的session.timeout.ms,根本原因是处理时间太长,大于设定的session.timeout.ms。如果长顷散时间不调用poll()方法,集群会认为该消费者已经挂掉了,就不会让它提交偏移量了,这样就会造成重复消费数据。

Assuming we are talking about Kafka 0.10.1.0 or upwards where each consumer instance employs two threads to function. One is user thread from which poll is called; the other is heartbeat thread that specially takes care of heartbeat things.

session.timeout.ms is for heartbeat thread. If coordinator fails to get any heartbeat from a consumer before this time interval elapsed, it marks consumer as failed and triggers a new round of rebalance.

max.poll.interval.ms is for user thread. If message processing logic is too heavy to cost larger than this time interval, coordinator explicitly have the consumer leave the group and also triggers a new round of rebalance.

heartbeat.interval.ms is used to have other healthy consumers aware of the rebalance much faster. If coordinator triggers a rebalance, other consumers will only know of this by receiving the heartbeat response with REBALANCE_IN_PROGRESS exception encapsulated. Quicker the heartbeat request is sent, faster the consumer knows it needs to rejoin the group.

Suggested values:

session.timeout.ms : a relatively low value, 10 seconds for instance.

max.poll.interval.ms : based on your processing requirements

heartbeat.interval.ms : a relatively low value, better 1/3 of the session.timeout.ms

修改配置参数,调大间隔,调小一次处理的最大任务数量

使用多线程并行处理

[img]

kafka重复消费的原因

如果自动提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理,

假设我们采用了自动提交,且提交时碧滚态间间隔为5s,在最近一次提交之后的3s发生了再均衡,再均衡之后,悔源消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了3s,所以在这3s内到达的消息会被重复处理。可以通过修改提交时间间隔来更频繁地提交偏移量,减小可备芦能出现重复悄息的时间窗,不过这种情况是无也完全避免的。

kafka重复消费的问题

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

造成的问题:假如consumer.properties配置中max.poll.records=40 (一次最多拉取40条数据) session.timeout.ms=30000 (会话时间)

假设kafka此时一次拉取了40条数据,但在处理弊键第31条的时候抛出了如上的异常,就会导致,本次offset不会提交,完了这40条消息都会在接下来的某刻被再次消费,这其中就包含了其实已经消费了的30条数据

原因:the poll loop is spending too much time message processing, the time between subsequent calls to poll() was longer than the configured session.timeout.ms,好吧其实是一个意思!

意思就是说poll下来数据后,处理这些数据的时间比 session.timeout.ms配置的时间肢卜悄要长,从而导致the group has already rebalanced

解决办法是最后一句话:You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

即要不增大 session.timeout.ms,要不减小max.poll.records ,至于具体配置为多少,得看你处理一条消息花费多长时间 x,需要满足 x乘以max.poll.records session.timeout.ms

另一种解决思路:

解决此类重复消费的方式:将能够唯一标识消息的信息存储在其他系统,比如redis,什么能够历渣唯一标识消息呢?就是consumergroup+topic+partition+offset,更准确的应该是consumergroup+" "+topic+" "+partition+"_"+offset组成的key,value可以是处理时间存放在redis中,每次处理kafka消息时先从redis中根据key获取value,如果value为空,则表明该消息是第一次被消费,不为空则表示时已经被消费过的消息;

参考:

Kafka的重复、丢数据及顺序消费等问题

①、kafka的顺序消息仅仅是通过partitionKey,将丛芹某类消息写入橡郑运同一个partition,一个partition只能对应一个消费线程,以保证数据有序。

②、除了发送消息需要指定partitionKey外,producer和consumer实例化无区别。

③、kafka broker宕机,kafka会有自选择,所以宕机不会减少partition数量,也就不会影响partitionKey的sharding。

acks设置为0:broker接收消息立即返回,还没写入磁盘,容易丢失数据

acks设置为1:等待broker的ack,如果leader落盘了就返回ack,如果follower同步完成前leader挂了就会丢失未同步的数据(follower选举)

acks设置为-1:等待所有leader和follower都落盘后返回ack,如果follower已同步,但是broker返回ack前leader挂了,则会重复发送消息。

consumer自动提交offset,但其实未处理好消息,容易丢数据。可以选择手动提交,处理完后再提交offset

0.9版本的kafka改进了coordinator的设计,提出了group coordinator——每个consumer group都会被分配一个这样的coordinator用于组管理和位移管理。这个group coordinator比原来承担了更多的责任,比如组成员管理、位移提交保护机制等。当新版本consumer group的第一个consumer启动的时候,它会去和kafka server确定谁是它们组的coordinator。之后该group内的所有成员都会和该coordinator进行协调通信。显而易见,这种coordinator设计不再需要zookeeper了,性能上可以得到很大的提升。

每个 Group 都会选择一个 Coordinator 来完成自己组内各 Partition 的 Offset 信息,选择的规则如下: 1. 计算 Group 对应在 __consumer_offsets 上的 Partition 2. 根据对应的Partition寻找该Partition的leader所对应的Broker,该Broker上的Group Coordinator即就是该Group的Coordinator

numPartitionsPerConsumer=counsumer/partitions——》5/3=1,每个消费者至少被分配一个partition

consumersWithExtraPartition=counsumer%partitions——》5%3=2

i=0,start=0,length=2;

i=1,start=2,length=2;

i=2,start=4,length=1;

如果是4个partitions和3个consumer

i=0,start=0,length=2;

i=1,start=2,length=1;

i=2,start=3,length=1;

for(每一个TopicPartition)

以RoundRobin的方式选择一个订阅了这个Topic的Consumer,将这个TopicPartition分派给这个梁梁Consumer end

“sticky”这个单词可以翻译为“粘性的”,Kafka从0.11.x版本开始引入这种分配策略,它主要有两个目的:

为什么会重复消费:第一种可能是生产者重复发送消息。第二种可能是消费者手动提交时挂掉了,导致消费了数据但是没有提交offset。

为什么会丢失数据:第一种可能是ack非-1的情况下,follower未同步完全,leader挂了。第二种可能是消费者自动提交,但其实还没完成消费。

怎么保证生产者消息不重复,0.11后,生产者会生成pid,和一个sequence number,通过pid sequence number brokerid作为key,如果在partition中已经存在,则只持久化一条。且Producer重启可以通过TransactionID拿到原来的pid,所以可以跨会话的保持一致

保证顺序消费:需要保证顺序的消息发到同一个partition中,consumer会自己根据顺序消费

0.9.0.0 版本之前判断副本之间是否同步,主要是靠参数 replica.lag.max.messages 决定的,即允许 follower 副本落后 leader 副本的消息数量,超过这个数量后,follower 会被踢出 ISR。

replica.lag.max.messages 也很难在生产上给出一个合理值,如果给的小,会导致 follower 频繁被踢出 ISR,如果给的大,broker 发生宕机导致 leader 变更时,肯能会发生日志截断,导致消息严重丢失的问题。

在 0.9.0.0 版本之后,Kafka 给出了一个更好的解决方案,去除了 replica.lag.max.messages,,用 replica.lag.time.max.ms 参数来代替,该参数的意思指的是允许 follower 副本不同步消息的最大时间值,即只要在 replica.lag.time.max.ms 时间内 follower 有同步消息,即认为该 follower 处于 ISR 中,这就很好地避免了在某个瞬间生产者一下子发送大量消息到 leader 副本导致该分区 ISR 频繁收缩与扩张的问题了。

Kafka集群中多个broker,有一个会被选举为controller leader,负责管理整个集群中分区和副本的状态,比如partition的leader 副本故障,由controller 负责为该partition重新选举新的leader 副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic分区的时候也会由controller管理分区的重新分配工作

实际上,Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器。

故障转移

当 Broker 0 宕机后,ZooKeeper 通过 Watch 机制感知到并删除了 /controller 临时节点。之后,所有存活的 Broker 开始竞选新的控制器身份。Broker 3 最终赢得了选举,成功地在 ZooKeeper 上重建了 /controller 节点。之后,Broker 3 会从 ZooKeeper 中读取集群元数据信息,并初始化到自己的缓存中。

flink1.12版本中使用了flinksql,固定了groupid。但核做并是因为重复上了两个相同任务之后,发现数据消费改迹重复。

下图sink中创建两个相同任务,会消费相胡脊同数据。

两个任务同时处理,并没有在一个consume group里,所以不会共同消费。

关于kafka重复消费和kafka重复消费问题解决的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表