kafkaconsumer(kafkaconsumer is not safe for)

本篇文章给大家谈谈kafkaconsumer,以及kafkaconsumer is not safe for对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka 源码解析之 Consumer 两种 commit 机制和 partition 分配机制

先看下两种不同的 commit 机制,一种是同步 commit,一种是异步 commit,既然其作用都是 offset commit,应该不难猜到它们底层使用接口都是一样的

同步 commit

同步 commit 的实现方式,client.poll() 方法会阻塞直到这个request 完成或超时才会返回。

异步 commit

而对于异步的 commit,最后调用的都是 doCommitOffsetsAsync 方法,其具体实现如下:

在异步 commit 中,可以添加相应的回调函数,如果 request 处理成功或处理失败,ConsumerCoordinator 会通过 invokeCompletedOffsetCommitCallbacks() 方法唤醒相应的回调函数。

关键区别在于future是否会get,同步提交就是future会get.

consumer 提供的两种不同 partition 分配策略,可以通过 partition.assignment.strategy 参数进行配置,默认情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor,Kafka 中提供另一种 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor

用户可以自定义相应的 partition 分配机制,只需要继承这个 AbstractPartitionAssignor 抽象类即可。

AbstractPartitionAssignor

AbstractPartitionAssignor 有一个抽象方法,如下所示:

assign() 这个方法,有两个参数:

RangeAssignor 和 RoundRobinAssignor 通过这个方法 assign() 的实现,来进行相应的 partition 分配。

直接看一下这个方法的实现:

假设 topic 的 partition 数为 numPartitionsForTopic,group 中订阅这个 topic 的 member 数为 consumersForTopic.size(),首先需要算出两个值:

分配的陪巧规则是:对于剩下的那些 partition 分配到前 consumersWithExtraPartition 个 consumer 上,也就是前 consumersWithExtraPartition 个 consumer 获得 topic-partition 列表会比后面多一个。

在上述的程序中,举了一个例子,假设有一个 topic 有 7 个 partition,group 有5个 consumer,这个5个 consumer 都订阅这个 topic,那么 range 的分配方式如下:

而如果 group 中有 consumer 没有订阅这个 topic,那么这个 consumer 将不会参与分配。下面再举个例子,将有两个 topic,一个 partition 有5个,一个孝银 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:

这个是 roundrobin 的实现,其实现方法如下:

roundrobin 的实现原则,简单来说就是:列出所有 topic-partition 和列出所有的 consumer member,然后开始分配,一轮之后继续下一轮,假设有有一个 topic,它有7个 partition,group 有3个 consumer 都订阅了这个 topic,那么其分配方式为:

对于多个 topic 的订阅,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是芦慎键只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:

roundrobin 分配方式与 range 的分配方式还是略有不同。

consumer(KafkaConsumer)

(一)消费者和消费者组

1、消费者:订阅并消费kafka消息,从属于消费者组

2、消费者组:一个群组里的消费者订阅的是同一个主题,每个消费者接受主题一部分分区的消息。

注:同一个消费者可以消费不同的partition,但是同一个partition不能被不同消费者消清毁笑费。

(二)消费者群组和分区再均衡

1、再均衡:分区的消费所有权从一个消费者转移到另一个消费者称为再均衡,为消费者组带来了答含高可用性和可伸缩性。

注:分区何时重新分配:加入消费者或者消费者崩溃等

2、如何判断消费者崩溃:消费者通过向群组协调器(某broker,不同群组可以有不同的群组协调器)发送心跳(一般在拉取消息或者提交偏移量的时候)表示自己仍旧存活,如果长时间不发送心跳则协调器认为期死亡并进行再均衡。

注:在0.10.1版本中,心跳行为不再和获取消息和提交偏移余薯量绑定在一起,有一个单独的心跳线程。

3、分配分区:消费者加入消费者组是,会像群组协调器发送请求,第一个加入的成为“群主”。群主从协调器那里获取成员列表,并负责给每一个消费者分配分区。完毕之后,将分配结果发送给协调器,协调器再将消息发送给所有的消费者,每个消费者只能看到自己的分配信息。只有群主知道所有的消费信息。

(三)参数配置

1、bootstrap.server:host:port

2、key.serializer:键序列化器

3、value.serializer:值序列化器

注:以上为必须设置的

4、group.id:从属的消费者组

5、fetch.min.bytes:消费者从服务器获取记录的最小字节数。

6、fetch.max.wait.ms:消费者等待消费消息的最大时间

7、max.partition.fetch.bytes:服务器从每个分区返回给消费者的最大字节数(需要比broker的设置max.message.size属性配置大,否则有些消息无法消费)

8、session.timeout.ms:指定该消费者在被认为死亡之前可以与服务器断开连接的时间,默认3秒

9、heartbeat.interval.ms:制定了poll方法向协调器发送心跳的频率。

注:一般9是8的三分之一

10、auto.offset.reset:消费者在读取一个没有偏移量分区或者无效偏移量分区的情况下如何处理(latest:从最新记录开始读取,earliest:从最早的记录开始读取)

11.、enable.auth.commit:消费者是否自动提交偏移量,默认为true

12、auto.commit.interval.ms:自动提交偏移量的时间间隔

13、partition.assignment.strategy:分区分配给消费者的策略:

(1)range:会把主题若干个连续分区分配给消费者

(2)roundRobin:会把主题的所有分区逐个分配给消费者

14、client.id:任意字符串,broker用来区分客户端发来的消息

15:max.poll.records:控制poll方法返回的最大记录数

16:receive.buffer.bytes/send.buffer.bytes:tcp缓冲池读写大小

(四)订阅主题

consumer.subscribe(list)

(五)轮训(消费者API的核心)

1、轮训作用: 只要消费者订阅了主题,轮训就会处理所有的细节(群组协调、分区再均衡、发送心跳、获取数据)

(1)获取数据

(2)第一次执行poll时,负责查找协调器,然后加入群组,接受分配的分区

(3)心跳的发送

(4)再均衡也是在轮训期间进行的

2、方法:poll(),消费者缓冲区没有数据时会发生阻塞,可以传一个阻塞时间,避免无限等待。0表示立即返回。

3、关闭:close(),网络连接随之关闭,立即触发再均衡。

4、线程安全:无法让一个线程运行多个消费者,也无法让多个线程公用一个消费者。

(六)提交和偏移量

1、提交:更新分区当前位置的操作

2、如何提交:消费者往一个特殊主题(_consumer_offset)发送消息,消息中包含每个分区中的偏移量。

3、偏移量:分区数据被消费的位置。

4、偏移量作用:当发生再均衡时,消费者可能会分配到不一样的分区,为了继续工作,消费者需要读取到每个分区最后一次提交的偏移量,然后从偏移量的地方继续处理。

5、提交偏移量的方式

(1)自动提交:经过一个时间间隔,提交上一次poll方法返回的偏移量。每次轮训都会检测是否应该提交偏移量。缺陷:可能导致重复消费

(2)手动提交:commitSysn()提交迁移量,最简单也最可靠,提交由poll方法返回的最新偏移量。缺点:忘了提交可能会丢数据,再均衡可能会重复消费

(3)异步提交:同步提交在提交过程中必须阻塞

(4)同步异步提交组合

(5)提交特定的偏移量

(七)再均衡监听器

(八)从特定偏移量读取数据(seek)

1、从分区开始:seekToBegining

2、从分区结束:seekToEnd

3、ConsumerRebalanceListener和seek结合使用

(九)如何退出

1、前言:wakeup方法是唯一安全退出轮训的方法,从poll方法中退出并抛出wakeupException异常。如果没有碰上轮训,则在下一次poll调用时抛出。

2、退出轮训

(1)另一个线程调用consumer.wakeup方法

(2)如果循环在主线程里可以在ShutdownHook里面调用该方法

3、退出之前调用close方法:告知协调器自己要离开,出发再均衡,不必等到超时。

(十)独立消费者(assign为自己分配分区)

[img]

kafka consumer offset机制

kafka消息在分区中是按序一条一条存储的,假如分区中有10条消息,位移就是0-9,

consumer消费了5条消息,那么offset就是5,指向了下一条要消费桐宏或的记录,consumer

需要向kafka汇报自己的位移数据,因为consumer是能够消费多个分区的,所以offset

的粒度是分区,consumer需要为分配给他的各分区分别提交offset信息。

从用户的角度来说,位移提交分为自动提交和手动提交,在consumer的角度来说,位移

分为同步提交和异步提交。

kafka内部有个topic叫 ‘__consumer_offsets’,offset提交就是往这个topic发送一条消息,

消息格式是key value形式,key是由 groupId、主题名、分区号组成,消息体是位移值

及用户自定义数据和时间戳等。还有2种特殊的格式,一种是用于保存 Consumer Group 

信息的消息,用于注册group,另一种是 用于删除 Group 过期位移和删除 Group 的消息。

当kafka集群种第一台consumer启动时,便会创建__consumer_offsets主题,默认50个

分区和3个副本。

当提交方式是自动提交时,就算是当前consumer的offset已经不更新,kafka还是会自动

定期的往__consumer_offsets发送位移消息,所以得对位移主题的消息做定期删除,

假如对于同一个key有2条A和B,A早于B发送,那么A就是属于过期消息。

compact有点类似jvm gc的标记-整理,把过期消息删掉,把剩下的消息排列在一起

Kafka 提供了专门的后台线程定期地巡检待Compact 的主题,看看是否存在满足条件的

可删除数据,这个线程叫Log Cleaner,当我们发现位移主题日志过多的时候,可以

检查一下是否是这个线程挂了导致的

enable.auto.commit 默认即是true,

auto.commit.interval.ms 默认是5秒,表示kafka每5秒自动提交一次位移信息。

自动提交会有消息重复消费的问题,因为他是间隔时间提交一次,假如在间隔期间,

发生了Rebalance ,在Rebalance 之后所有的消费者必须从当前最新的offset开始

继续消费,那么上一次自动提交到Rebalance 的这段时间消费的数据的位移并没有

提交,所以会重复消费,即时我们通过减少 auto.commit.interval.ms 的值来提高提交频率,

那也仅仅是缩小了重复消费的时间窗口,所以我们看看能不能通过手动提交来避免重复消费。

commitSync()是consumer的同步api,手动提交的好处自然是我们可以控制提交的时机

和频率,由于是同步api,是会阻塞至broker返回结果才会结束这个阻塞状态,对于系统

而言,自然不想发生这种不是由于资源的限制导致的阻塞。

commitAsync()是consumer的异步api,commitAsync()不会阻塞,因此不会影响consumer的

tps,但是他的问题在于他无法重试,因为是异步提交,当因为网络或者系统资源阻塞

导致提交失败,那么他重试的时候,在这期间,consumer可能已经消费好多条消息

并且提交了,所以此时的重试提交的offset已然不是最新值绝戚了并没有意义,我们可以通过

异步和同局伍步提交相结合,我们使用同步提交来规避因为网络问题或者broker端的gc导致的

这种瞬时的提交失败,进而通过重试机制而提交offset,使用异步提交来规避提交时的阻塞

前面的commitSync()和commitAsync(),都是consumer poll消息,把这些消息消费完,

再提交最新的offset,如果poll的消息很多呢?消费时间较长,假如中间系统宕机,岂不是

得从头再来一遍,所以kafka提供分段提交的api

commitSync(MapTopicPartition, OffsetAndMetadata) 

 commitAsync(MapTopicPartition, OffsetAndMetadata)

假设我们poll了一秒钟的数据,有5000条,我们可以通过计数器,累计到100条,

便通过分段提交api向kafka提交一次offset。

Kafka Consumer各种提交方式

普通的API

publicstaticvoidCommonDemo(){finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","true");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};              KafkaConsumer consumer =newKafkaConsumer(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);try{                Thread.sleep(200);            }catch(InterruptedException e) {                e.printStackTrace();            }for(ConsumerRecord record : records) {                System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s"+System.lineSeparator(),record.topic(),                record.partition(),record.offset(), record.key(), record.value());            }        }    }

以上是一个纤槐非常常见的简单消费者实例,但是,这样,真的,没问题吗?

以上会存在以下的问题:

//自动提交,会有问题

1.默认会5秒提交一次offset,但是中间停止的话会造成重复消费

2.新添加进消费者组的时候,会再均衡,默认从上次消费提交的地举没方开始,消息重复

3.自动提交,虽然提交了偏移量,但并不知道,哪些消息被处理了,是否处理成功,偏移量是否提交成功

针对以上的内容,做出修改

同步提交

publicstaticvoidSyncDemo(){finalProperties properties =newProperties() {{ 正竖纳           put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                        record.partition(),record.offset(),record.key(),record.value());            }try{                                consumer.commitSync();            }catch(Exception e){                e.printStackTrace();            }        }    }

以上是同步提交,但是,会存在一些问题,同步提交,会阻塞,直到有返回结果,性能会差一些。

异步提交

publicstaticvoidAsyncDemo(){finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                        record.partition(),record.offset(),record.key(),record.value());            }try{//consumer.commitAsync();//发送提交请求,提交失败就纪录下来consumer.commitAsync(newOffsetCommitCallback() {@OverridepublicvoidonComplete(Map map, Exception e){if(e !=null){                          e.printStackTrace();                        }                    }                });            }catch(Exception e){                e.printStackTrace();            }        }    }

异步提交的特性:与同步提交不同的是,遇到错误,commitSync会一直重试,但是commitAsync不会,原因,很简单,如果异步提交还重试,会存在一个问题,a提交2000的偏移量,网络问题,一直重试,但下一个3000的提交成功,这时候,2000的ok了,就会造成消息重复。

异步提交和同步提交组合的方式

publicstaticvoidSyncAndAsyncDemo(){finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer(properties);        consumer.subscribe(Arrays.asList("test"));try{while(true) {                ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                    System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                            record.partition(),record.offset(),record.key(),record.value());                }                consumer.commitAsync();            }        }catch(Exception e){            e.printStackTrace();        }finally{try{                consumer.commitSync();            }catch(Exception e){                consumer.close();            }        }    }

同步提交和异步提交使用组合的方式进行提交,但,这还是会存在一些问题。 因为提交都是批量提交的,但是有可能在批量处理没完成,偏移量没完成的时候,出错了

自指定提交

publicstaticvoidPersonalDemo(){        Map currentOffsets =newHashMap();intcount =0;finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer(properties);        consumer.subscribe(Arrays.asList("test"));while(true) {            ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                        record.partition(),record.offset(),record.key(),record.value());                currentOffsets.put(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1,"no meta"));if(count %1000==0){                    consumer.commitAsync();                }                count++;            }        }    }

指定每1000条提交一次offset。

再均衡监听器

publicstaticvoidRebalanceListenDemo(){        Map currentOffsets =newHashMap();finalProperties properties =newProperties() {{            put("bootstrap.servers","localhost:9092");            put("group.id","test");            put("enable.auto.commit","false");            put("auto.commit.interval.ms","1000");            put("session.timeout.ms","30000");            put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");            put("auto.offset.reset","earliest");        }};        KafkaConsumer consumer =newKafkaConsumer(properties);classHandleRebalaceimplementsConsumerRebalanceListener{@OverridepublicvoidonPartitionsRevoked(Collection collection){            }@OverridepublicvoidonPartitionsAssigned(Collection collection){                System.out.println("partition rebalance offset is "+currentOffsets);                consumer.commitSync(currentOffsets);            }        }try{            consumer.subscribe(Arrays.asList("test"),newHandleRebalace());while(true) {                ConsumerRecords records = consumer.poll(100);for(ConsumerRecord record:records){                    System.out.printf("topic = %s, partition = %s, offset"+" = %d, costomer = %s, country = %s \n",record.topic(),                            record.partition(),record.offset(),record.key(),record.value());                    currentOffsets.put(newTopicPartition(record.topic(),record.partition()),newOffsetAndMetadata(record.offset()+1,"no meta"));                }                consumer.commitAsync(currentOffsets,null);            }        }catch(Exception e){            e.printStackTrace();        }finally{try{                consumer.commitSync(currentOffsets);            }catch(Exception e){                e.printStackTrace();            }finally{                consumer.close();            }        }    }

关于kafkaconsumer和kafkaconsumer is not safe for的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表