kafka发送消息(kafka发送消息中文乱码)
本篇文章给大家谈谈kafka发送消息,以及kafka发送消息中文乱码对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、SpringBoot集成Kafka,实现简单的收发消息
- 2、Kafka消息送达语义详解
- 3、kafka原理分析
- 4、Kafka系列之(4)——Kafka Producer流程解析
- 5、如何保证kafka生产者发送消息的可靠性
- 6、kafka批量发送消息实验
SpringBoot集成Kafka,实现简单的收发消息
在kafka的 config 目录下找到 server.properties 配置文件
把 listeners 和蚂让 advertised.listeners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092
KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码
通过 KafkaTemplate 模板类发送数据。
kafkaTemplate.send(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics
bootstrap-servers :kafka服务器地址(可以多个)
consumer.group-id :指定一个默认的组名
不指定的话会报
1. earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2. latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3. none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常
这个属性也是必须配置的,不然也是会报错的
在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumer.key-deserializer 和 consumer.value-deserializer 是消费者 key/value 反序列化
producer.key-deserializer 和 producer.value-deserializer 是生产者 key/value 序列化
StringDeserializer 是内置的字符串反序列化历物亏方式
StringSerializer 是内置的字符串序列化方式
在 org.apache.kafka.common.serialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化肢神方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer
详细可以参考
这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息
访问 ,就可以看到控制台打印消息了
[img]Kafka消息送达语义详解
消息送达语义 是消息系统中一个常见的问题,主要包含三种语义:
下面我们分别从发送者和消费者的角度来阐述这三种消息送达语义。
从Producer的角度来看, At most once意味着Producer发送完一条消息后,不会确认消息是否成功送达 。这样从Producer的角度来看,消息仅仅被发送一次,也就存在者丢失的可能性。
从Producer的角度来看, At least once意味着Producer发送完一条消息后,会确认消息是否发送成功。如果Producer没有收到Broker的ack确认消息,那么会不断重试发送消息 。这样就意味着消息可能被发送宽岁厅不止一次,也就存在这消息重复的可能性。
从Producer的角度来看, Exactly once意味着Producer消息的发送是幂等的 。这意味着不论消息重发多少遍,最终Broker上记录的只有一条不重复的数据。
Kafka默认的Producer消息送达语义就是At least once,这意味着我们不用做任何配置就能够实现At least once消息语义。
我们可以通过配置Producer的以下配置项来实现At most once语义:
Exactly once是Kafka从版本0.11之后提供的高级特性。我们可以通过配置Producer的以下配置项来实现Exactly once语义:
Kafka本身支持At least once消息送达语义,因此实现消息发送的幂等关键是要实现Broker端消息的去重。为了实现消息发送的幂等性,Kafka引入了两个新的概念:
Broker端在缓存中保存了这Sequence Numbler,对于接收的每条消息,如果其序号比Broker缓存中序号大于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。但是,只能保证单个Producer对于同一个Topic, Partition的Exactly Once语义。不能保证同一个Producer一个topic不同的partion幂等。
从Consumer的角度来看, At most once意味着Consumer对雀隐一条消息最多消费一次,因此有可能存在消息消费失败依旧提交offset的情况 。考虑下面的情况:Consumer首先读取消息,然后提交offset,最后处理这条消息。在处理消息时,Consumer宕机了,此时offset已经提交,下一次读取消息时读到的是下一条消息了,这就是At most once消费。
从Consumer的角度来看, At least once意味着Consumer对一条消息可能消费多次 。考虑下面的情况:Consumer首先读取消息,然后处理这条消息,最后提交offset。在处理消慎隐息时成功后,Consumer宕机了,此时offset还未提交,下一次读取消息时依旧是这条消息,那么处理消息的逻辑又将被执行一遍,这就是At least once消费。
从Consumer的角度来看, Exactly once意味着消息的消费处理逻辑和offset的提交是原子性的,即消息消费成功后offset改变,消息消费失败offset也能回滚 。
通过手动提交offset,就可以实现Consumer At least once语义。
通过自动提交offset,并且将定时提交时间间隔设置的很小,就可以实现Consumer At most once语义。
一个常见的Exactly once的的使用场景是:当我们订阅了一个topic,然后往另一个topic里写入数据时,我们希望这两个操作是原子性的,即如果写入消息失败,那么我们希望读取消息的offset可以回滚。
此时可以通过Kafka的Transaction特性来实现。Kafka是在版本0.11之后开始提供事务特性的。我们可以将Consumer读取数据和Producer写入数据放进一个同一个事务中,在事务没有成功结束前,所有的这个事务中包含的消息都被标记为uncommitted。只有事务执行成功后,所有的消息才会被标记为committed。
我们知道,offset信息是以消息的方式存储在Broker的__consumer_offsets topic中的。因此在事务开始后,Consumer读取消息后,所有的offset消息都是uncommitted状态。所有的Producer写入的消息也都是uncommitted状态。
而Consumer可以通过配置 isolation.level 来决定uncommitted状态的message是否对Consumer可见。 isolation.level 拥有两个可选值: read_committed 和 read_uncommitted 。默认值为 read_uncommitted 。
当我们将 isolation.level 配置为 read_committed 后,那么所有事务未提交的数据就都对Consumer不可见了,也就实现了Kafka的事务语义。
kafka原理分析
作为一款典型的消息中间件产品,kafka系统仍然由producer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简派薯单介绍如下:
当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。
__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsets.topic.num.partitions(默认值为50)和offsets.topic.replication.factor(默认值为1)参数配置。我们通过公式 hash(group id) % offsets.topic.num.partitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact操作,只保留最新的一次提交offset。
group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。
跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,2.8版本之前通腔销过zookeeper进行选主,2.8版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)
当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。
因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本伍羡游分配时需要使partition的分布情况是如下这样的:
在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。
举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。
1)没有配置broker.rack的情况
现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:
2)配置了broker.rack的情况
假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:
kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本
kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。
当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个unclean.leader.election配置参数,它的默认值为true。当unclean.leader.election值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当unclean.leader.election值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。
当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。
为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:
1)follower副本处于活跃状态,与zookeeper(2.8之前版本)或kafka raft master之间的心跳正常
2)follower副本最近replica.lag.time.max.ms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。
follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW = leader HW = follower LEO = leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。
kafka的消息内容存储在log.dirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘log.dirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}.index)和一个时间戳索引文件(命名规范为:${baseOffset}.timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。.index文件存储的是消息的offset到该消息在相应.log文件中的偏移,便于快速在.log文件中快速找到指定offset的消息。.index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。.timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。
可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,log.dirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。
kafka提供了两个参数log.segment.bytes和log.segment.ms来控制LogSegment文件的大小。log.segment.bytes默认值是1GB,当LogSegment大小达到log.segment.bytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。log.segment.ms表示最大多长时间会生成一个新的LogSegment,log.segment.ms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。
kafka还提供了log.retention.ms和log.retention.bytes两个参数来控制消息的保留时间。当消息的时间超过了log.retention.ms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了log.retention.bytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在log.retention.bytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过log.retention.ms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略
当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式
KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:
从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducer.send()方法的应用程序线程,因为KafkaProducer.send()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducer.send()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。
消息的发送过程如下:
在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumer.wakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。
跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。
我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:
我们可以通过实现接口org.apache.kafka.clients.consumer.internals.PartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。
partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数auto.offset.reset值确定consumer消费的其实offset。如果auto.offset.reset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumer.seek()方法人工设置消费的起始offset。
kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在.index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetch.min.bytes和max.partition.fetch.bytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetch.max.wait.ms告诉broker即使消息大小没有达到fetch.min.bytes值,在收到请求后最多等待fetch.max.wait.ms时间后,也将当前消息返回给consumer。fetch.min.bytes默认值为1MB,待fetch.max.wait.ms默认值为500ms。
为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。
为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。
kafka提供了多种offset提交方式
partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。
kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。
触发再平衡的条件包括:
需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡
有两种情况与日常应用开发比较关系比较密切:
consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。
我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。
以下是保证kafka吞吐量大的一些设计考虑:
但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:
所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。
我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。
kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:
kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。
如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数max.in.flight.requests.per.connection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。
Kafka系列之(4)——Kafka Producer流程解析
Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。
注:ProducerRecord允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。
流程描述:
用户首先构建待发送的消息对象ProducerRecord,然后调用KafkaProducer#send方法进行发送。KafkaProducer接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给partitioner去确定目标分区,最后追加写入到内存中的消息缓冲池(accumulator)。此时KafkaProducer#send方法成功返回。同时,KafkaProducer中还有一个专门的耐乎Sender IO线程负责将缓冲池中的消息分批次发送给对应的broker,完成真正的消息发送逻辑。
新版本的producer从设计上来说具有以下几个特点:
总共创建两个线程:执行KafkaPrducer#send逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。
不同于Scala老版本的producer,新版本producer完全异步发送消息,并提供了回调机制(callback)供用户判断消息是否成功发送。
batching机制——“分批发送“机制。每个批次(batch)中包含了若干个PRODUCE请求,因此具有更高的吞吐量。
更加合理的默认分区策略:对于无key消息而言,Scala版本分区策略是一段时间内(默认是10分钟)将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的producer采用轮询的方式均匀地将消息分发到不同的分区。
底层统一使用基于Selector的网络客户端实现,结合Java提供的Future实昌毕悉现完整地提供了更加健壮和优雅的生命周期管理。
关键参数
batch.size 我把它列在了首位,因为该参数对于调优producer至关重要。之前提到过新版producer采用分批发送机制,该参数即控制一个batch的大小。默认是16KB
acks 关乎到消息持久数铅性(durability)的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么? 高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0, -1和1
linger.ms 减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小batch,通过引入延时的方式合并成大batch发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时
compression.type producer 所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置
max.in.flight.requests.per.connection 关乎消息乱序的一个配置参数。它指定了Sender线程在单个Socket连接上能够发送未应答PRODUCE请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升producer的性能。不过笔者始终觉得其效果不如调节batch.size来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于1可能造成消息发送的乱序(先发送A,然后发送B,但B却先行被broker接收)
retries 重试机制,对于瞬时失败的消息发送,开启重试后KafkaProducer会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。
当用户调用KafkaProducer.send(ProducerRecord, Callback)时Kafka内部流程分析:
这是KafkaProducer#send逻辑的第一步,即为待发送消息进行序列化并计算目标分区,如下图所示:
如上图所示,一条所属topic是"test",消息体是"message"的消息被序列化之后结合KafkaProducer缓存的元数据(比如该topic分区数信息等)共同传给后面的Partitioner实现类进行目标分区的计算。
producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的linger.ms和batch.size等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。举个简单的例子,假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" - [batch1, batch2], "test-1" - [batch3]}。
单个topic分区下的batch队列中保存的是若干个消息批次。每个batch中最重要的3个组件包括:
compressor: 负责执行追加写入操作
batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方
thunks:保存消息回调逻辑的集合
这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示:
这一步执行完毕之后理论上讲KafkaProducer.send方法就执行完毕了,用户主线程所做的事情就是等待Sender线程发送消息并执行返回结果了。
此时,该Sender线程登场了。严格来说,Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:
不断轮询缓冲区寻找 已做好发送准备的分区 ;
将轮询获得的各个batch按照目标分区所在的leader broker进行分组;
将分组后的batch通过底层创建的 Socket连接 发送给各个broker;
等待服务器端发送response回来。
为了说明上的方便,我还是基于图的方式来解释Sender线程的工作原理:
上图中Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法,如下图所示:
refer:
如何保证kafka生产者发送消息的可靠性
继一年前的 kafka介绍的学习总结 ,生产者Producers按照主题topic把消息发给kafka集群的主分区,其他分区从主分区同步该消息。
具体来看kafka的分布特性性:kafka消息的分区分布在Kafka集群的某些服务老丛孝器上,每个分区都有一个服务器充当leader,有0个或多个充当follower。leader处理分区的所有读请求和写请求,同时follower被动的从leader同步数据。假如leader异常了,其他follower会自动的选出一个新leader。每个服务器有可能充当某些分区的leader,同时也充当其他分区的follower,因此集群负载得到了很好的平衡和实现容错功能。
Kafka默认的副本因子是3,即每个分区只有1个leader副本和2个follower副本。
由于Kafka是一个分布式系统,follower必然会存在郑腔与leader不能实时同步的风险,那么follower副本在什么条件下才算与Leader同步?ISR同步副本机制解决这个侍稿问题。
In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本。ISR中是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中,而follower副本是否在ISR中,取决于该follower副本是否与leader副本保持了“同步”。
Kafka的broker服务端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒。只要follower副本落后于leader副本的时间间隔不超过10秒,就认为该follower副本与leader副本是同步的,即使follower副本落后于Leader副本几条消息,只要在10秒之内赶上Leader副本,就不会被踢出局。 如果follower副本被踢出ISR列表, 等到该副本追上了leader副本的进度,该副本会被再次加入到ISR列表中,所以ISR是一个动态列表,并不是静态不变的。
acks参数主要决定了kafka集群leader分区副本接收消息成功就响应成功还是fellower分区从leader同步成功才响应成功,这个参数对于消息是否丢失起着重要作用:
1) acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。由于不需要等到服务器的响应,可以以网络支持的最大速度发送消息,从而达到很高的吞吐量。
2) acks=1,只要集群的leader分区副本接收到了消息,就会向生产者响应成功。一旦消息无法写入leader分区副本(比如网络原因、leader节点崩溃),生产者会收到一个错误响应,为了避免数据丢失,生产者会重新发送消息。这种方式的吞吐量取决于使用的是异步发送还是同步发送。
3) acks =all,只要ISR同步副本数大于等于最小同步副本数min.insync.replicas( 提醒:ISR是动态的 )收到消息时,生产者才会接收到服务器的成功响应。这种模式是最高级别的,也是最安全的,可以确保不止一个Broker接收到了消息,该模式的延迟会很高。
当acks=all时,只要ISR同步副本中有主备副本都同步了才会响应成功给生产者。其实这里面存在一个问题:ISR同步副本是动态的,有可能仅仅含有一个leader副本(相当于acks=1),也有可能的全部副本(这个也没必要,拜占庭将军场景只要保证一半以上的副本正常同步)。需要一个参数决定至少有几个副本需要同步成功才能响应成功给生产者。
为了解决这个问题,Kafka的Broker端提供了一个参数**min.insync.replicas**,该参数控制着至少被写入的副本数,该值默认值为1,生产环境中可以根据部署的是单节点还是多节点,多节点要能够满足拜占庭将军场景,我们以3节点场景为例。
3节点场景1: 当min.insync.replicas=2且acks=all时,如果ISR列表只有[1,2],3被踢出ISR列表,只要保证2个副本同步了,生产者就会收到成功响应。
3节点场景2 :当min.insync.replicas=2时,如果ISR列表只有[1,2],3被踢出ISR列表。当acks=all时,则响应失败(需要生产者重新发消息直到响应成功);当acks=0或者acks=1时成功写入数据。
ps:该场景下acks=all,kafka集群一旦同步失败就直接响应失败嘛?还是有超时时长?生产者已经将消息发送到leader分区,kafka(响应失败)对这个消息如何处理?入集群持久化嘛?后续重试发送的消息如何处理?
3节点场景3 :如果min.insync.replicas=2且acks=all,此时ISR列表为[1,2,3],只要2副本同步成功还是等到所有的副本都同步了,才会向生产者发送成功响应?因为min.insync.replicas=2只是一个最低限制,同步副本少于该配置值,则会抛异常,而acks=all,是需要保证所有的ISR列表的副本都同步了才可以发送成功响应。
1) 要想系统的可靠性,从来不是一方能决定的, kafka生产者发送消息的可靠性 主要由 kafka服务端 的动态同步副本列表ISR和最小同步副本数min.insync.replicas以及 生产者 参数ack=all。
2) kafka服务端的最小同步副本数min.insync.replicas由部署的集群和节点个数来决定,满足拜占庭将军场景(节点个数一半以上即可)。
3) 生产者的 副本个数 由部署的集群和节点个数来决定,满足拜占庭将军场景(节点个数一半以上即可)。如果是单副本的话,本文讨论的就没意义了。
4) kafka单节点场景,kafka服务端的动态同步副本列表ISR和最小同步副本数min.insync.replicas均为1,生产者的副本数为1(如果大于1估计会失败),ack=all和ack=1的效果一样。
5) kafka 3节点场景,kafka服务端的动态同步副本列表ISR为3个,最小同步副本数min.insync.replicas均为2,生产者的副本数为2和ack=all。
6) kafka 奇数n节点场景,kafka服务端的动态同步副本列表ISR为n个,最小同步副本数min.insync.replicas均为(n+1)/2,生产者的副本数为(n+1)/2和ack=all。
Kafka生产者ack机制剖析
kafka官网给出的kafka生产者配置参数
kafka批量发送消息实验
批量发送:
配置
生产者
设置批量发送森枯且设置时延,当某此档洞个topic的消息超过batch-size,会把accumulator的消息全部发出去,即其他topic的也跟着一起发出去。
dosend方法详蠢清解:
关于kafka发送消息和kafka发送消息中文乱码的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。