kafka事务(kafka事务是怎么实现的)

本篇文章给大家谈谈kafka事务,以及kafka事务是怎么实现的对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

RabbitMQ,RocketMQ,Kafka 事务性,消息丢失和重复发送处理策略

我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之前圆型需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用,同时网络环境也是不稳定的,造成了我们多个机器之间的数据同步问题,这就是典型的分布式事务问题。

在分布式事务中事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是要保证不同节点之间的数据一致性。

1、2PC(二阶段提交)方案 - 强一致性

2、3PC(三阶段提交)方案

3、TCC (Try-Confirm-Cancel)事务 - 最终一致性

4、Saga事务 - 最终一致性

5、本地消息表 - 最终一致性

6、MQ事务 - 最终一致性

消息的生产方,除了维护自己的业务逻辑之外,同时需要维护一个消息表。这个消息表里面记录的就是需要同步到别的服务的信息,当然这个消息表,每个消息都有一个状态值,来标识这个消息有没有被成功处理。

发送放的业务逻辑以及消息表中数据的插入将在一个事务中完成,这样避免了业务处理成功 + 事务消息发送失败,或业务处理失败 + 事务消息发送成功,这个问题。

举个栗子:

我们假定目前有两个服务,订单服务,购物车服务,用户在购物车中对几个商品进行合并下单,之后需要情况购物车中刚刚已经下单的商品信息。

1、消息的生产方也就是订单服务,完成了自己的逻辑(对商品进行下单操作)然后把这个消息通过 mq 发送到需要进行数据同步的其他服务中,也就是我们栗子中的购物车服务。

2、其他服务(购物车服务)会监听这个队列;

1、如果收到这个消息,并且数据同步执行成功了,当然这也是一个本地事务,就通过 mq 回复消息的生产方(订单服务)消息已经处理了,然后生产方就能标识本次事务已经结束。如果是一个业务上的错误,就回复消息的生产方,需要进行数据回滚了。

2、很久没收到这个消息,这种情况是不会发生的,消息的发送方会有一个定时的任务,会定时重试发送消息表中还没有处理的消息;

3、消息的生产方(订单服务)如果收到消息回执;

1、成功的话就修改本次消息已经处理完,也就是本次分布式事务的同步已经完成;

2、如果消息的结果是执行失败,同时在本地回滚本次事务,标识消息已经处理完成;

3、如果消息丢失,也就是回执消息没有收到,这种情况也不太会发生,消息的发送方(订单服务)会有一个定时的任务,定时重试发送消息表中还没有处理的消息,下游的服务需要做幂等,可能会收到多次重复的消息,如果一个回复消息生产方中的某个回执信息丢失了,后面持续收到生产方的 mq 消息,然后再次回复消息的生产方回执信息,这样总能保证发送者能成功收到回执,消息的生产方在接收回执消息的时候也要做到幂等性。

这里有两个很重要的操作:

1、服务器处理消息需要是幂等的,消息的生产方和接收方都需要做到幂等性;

2、发送放需要添加一个定时器来遍历重推未处理的消息,避免消息丢失,橘慧猜造成的事务执行断裂。

该方案的优缺点

优点:

1、在设计层面上实现了消息数据的可靠性,不依赖消息中间件,弱化了对 mq 特性的依赖。

2、简单,易于实现。

缺点:

主要是需要和业务数据绑定到一起,耦合性比较高,使用相同的数据库,会占用业务数据库的一些资源。

下面分析下几种消息队列对事务的支持

RocketMQ 中的事务,它解决的问题是,确保执行本地事务和发消息这两个操作,要么都成功,要么都失败。并且碧悄,RocketMQ 增加了一个事务反查的机制,来尽量提高事务执行的成功率和数据一致性。

主要是两个方面,正常的事务提交和事务消息补偿

正常的事务提交

1、发送消息(half消息),这个 half 消息和普通消息的区别,在事务提交 之前,对于消费者来说,这个消息是不可见的。

2、MQ SERVER写入信息,并且返回响应的结果;

3、根据MQ SERVER响应的结果,决定是否执行本地事务,如果MQ SERVER写入信息成功执行本地事务,否则不执行;

如果MQ SERVER没有收到 Commit 或者 Rollback 的消息,这种情况就需要进行补偿流程了

补偿流程

1、MQ SERVER如果没有收到来自消息发送方的 Commit 或者 Rollback 消息,就会向消息发送端也就是我们的服务器发起一次查询,查询当前消息的状态;

2、消息发送方收到对应的查询请求,查询事务的状态,然后把状态重新推送给MQ SERVER,MQ SERVER就能之后后续的流程了。

相比于本地消息表来处理分布式事务,MQ 事务是把原本应该在本地消息表中处理的逻辑放到了 MQ 中来完成。

Kafka 中的事务解决问题,确保在一个事务中发送的多条信息,要么都成功,要么都失败。也就是保证对多个分区写入操作的原子性。

通过配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once,满足了读取-处理-写入这种模式的应用程序。当然 Kafka 中的事务主要也是来处理这种模式的。

什么是读取-处理-写入模式呢?

栗如:在流计算中,用 Kafka 作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这个过程中,要保证每条消息只被处理一次,这样才能保证最终结果的成功。Kafka 事务的原子性就保证了,读取和写入的原子性,两者要不一起成功,要不就一起失败回滚。

这里来分析下 Kafka 的事务是如何实现的

它的实现原理和 RocketMQ 的事务是差不多的,都是基于两阶段提交来实现的,在实现上可能更麻烦

先来介绍下事务协调者,为了解决分布式事务问题,Kafka 引入了事务协调者这个角色,负责在服务端协调整个事务。这个协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。

Kafka 集群中也有一个特殊的用于记录事务日志的主题,里面记录的都是事务的日志。同时会有多个协调者的存在,每个协调者负责管理和使用事务日志中的几个分区。这样能够并行的执行事务,提高性能。

下面看下具体的流程

事务的提交

1、协调者设置事务的状态为PrepareCommit,写入到事务日志中;

2、协调者在每个分区中写入事务结束的标识,然后客户端就能把之前过滤的未提交的事务消息放行给消费端进行消费了;

事务的回滚

1、协调者设置事务的状态为PrepareAbort,写入到事务日志中;

2、协调者在每个分区中写入事务回滚的标识,然后之前未提交的事务消息就能被丢弃了;

这里引用一下【消息队列高手课中的图片】

RabbitMQ 中事务解决的问题是确保生产者的消息到达MQ SERVER,这和其他 MQ 事务还是有点差别的,这里也不展开讨论了。

先来分析下一条消息在 MQ 中流转所经历的阶段。

生产阶段 :生产者产生消息,通过网络发送到 Broker 端。

存储阶段 :Broker 拿到消息,需要进行落盘,如果是集群版的 MQ 还需要同步数据到其他节点。

消费阶段 :消费者在 Broker 端拉数据,通过网络传输到达消费者端。

发生网络丢包、网络故障等这些会导致消息的丢失

在生产者发送消息之前,通过channel.txSelect开启一个事务,接着发送消息, 如果消息投递 server 失败,进行事务回滚channel.txRollback,然后重新发送, 如果 server 收到消息,就提交事务channel.txCommit

不过使用事务性能不好,这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ Server的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。

使用确认机制,生产者将信道设置成 confirm 确认模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一 deliveryTag 和 multiple 参数),这就使得生产者知晓消息已经正确到达了目的地了。

multiple 为 true 表示的是批量的消息确认,为 true 的时候,表示小于等于返回的 deliveryTag 的消息 id 都已经确认了,为 false 表示的是消息 id 为返回的 deliveryTag 的消息,已经确认了。

确认机制有三种类型

1、同步确认

2、批量确认

3、异步确认

同步模式的效率很低,因为每一条消息度都需要等待确认好之后,才能处理下一条;

批量确认模式相比同步模式效率是很高,不过有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送;

异步模式就是个很好的选择了,不会有同步模式的阻塞问题,同时效率也很高,是个不错的选择。

Kafaka 中引入了一个 broker。 broker 会对生产者和消费者进行消息的确认,生产者发送消息到 broker,如果没有收到 broker 的确认就可以选择继续发送。

只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。

只要正确处理 Broker 的确认响应,就可以避免消息的丢失。

RocketMQ 提供了3种发送消息方式,分别是:

同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。

异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。

Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。

在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。

防止在存储阶段消息额丢失,可以做持久化,防止异常情况(重启,关闭,宕机)。。。

RabbitMQ 持久化中有三部分:

消息的持久化,在投递时指定 delivery_mode=2(1是非持久化),消息的持久化,需要配合队列的持久,只设置消息的持久化,重启之后队列消失,继而消息也会丢失。所以如果只设置消息持久化而不设置队列的持久化意义不大。

对于持久化,如果所有的消息都设置持久化,会影响写入的性能,所以可以选择对可靠性要求比较高的消息进行持久化处理。

不过消息持久化并不能百分之百避免消息的丢失

比如数据在落盘的过程中宕机了,消息还没及时同步到内存中,这也是会丢数据的,这种问题可以通过引入镜像队列来解决。

镜像队列的作用:引入镜像队列,可已将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上来保证服务的可用性。(更细节的这里不展开讨论了)

操作系统本身有一层缓存,叫做 Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中。

Kafka 收到消息后也会先存储在也缓存中(Page Cache)中,之后由操作系统根据自己的策略进行刷盘或者通过 fsync 命令强制刷盘。如果系统挂掉,在 PageCache 中的数据就会丢失。也就是对应的 Broker 中的数据就会丢失了。

处理思路

1、控制竞选分区 leader 的 Broker。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。

2、控制消息能够被写入到多个副本中才能提交,这样避免上面的问题1。

1、将刷盘方式改成同步刷盘;

2、对于多个节点的 Broker,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。

消费阶段就很简单了,如果在网络传输中丢失,这个消息之后还会持续的推送给消费者,在消费阶段我们只需要控制在业务逻辑处理完成之后再去进行消费确认就行了。

总结:对于消息的丢失,也可以借助于本地消息表的思路,消息产生的时候进行消息的落盘,长时间未处理的消息,使用定时重推到队列中。

消息在 MQ 中的传递,大致可以归类为下面三种:

1、At most once: 至多一次。消息在传递时,最多会被送达一次。是不安全的,可能会丢数据。

2、At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。

3、Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。

大部分消息队列满足的都是At least once,也就是可以允许重复的消息出现。

我们消费者需要满足幂等性,通常有下面几种处理方案

1、利用数据库的唯一性

根据业务情况,选定业务中能够判定唯一的值作为数据库的唯一键,新建一个流水表,然后执行业务操作和流水表数据的插入放在同一事务中,如果流水表数据已经存在,那么就执行失败,借此保证幂等性。也可先查询流水表的数据,没有数据然后执行业务,插入流水表数据。不过需要注意,数据库读写延迟的情况。

2、数据库的更新增加前置条件

3、给消息带上唯一ID

每条消息加上唯一ID,利用方法1中通过增加流水表,借助数据库的唯一性来处理重复消息的消费。

kafka原理分析

作为一款典型的消息中间件产品,kafka系统仍然由producer、broker、consumer三部分组成。kafka涉及的几个常用概念和组件简派薯单介绍如下:

当consumer group的状态发生变化(如有consumer故障、增减consumer成员等)或consumer group消费的topic状态发生变化(如增加了partition,消费的topic发生变化),kafka集群会自动调整和重新分配consumer消费的partition,这个过程就叫做rebalance(再平衡)。

__consumer_offsets是kafka集群自己维护的一个特殊的topic,它里面存储的是每个consumer group已经消费了每个topic partition的offset。__consumer_offsets中offset消息的key由group id,topic name,partition id组成,格式为 {topic name}-${partition id},value值就是consumer提交的已消费的topic partition offset值。__consumer_offsets的分区数和副本数分别由offsets.topic.num.partitions(默认值为50)和offsets.topic.replication.factor(默认值为1)参数配置。我们通过公式 hash(group id) % offsets.topic.num.partitions 就可以计算出指定consumer group的已提交offset存储的partition。由于consumer group提交的offset消息只有最后一条消息有意义,所以__consumer_offsets是一个compact topic,kafka集群会周期性的对__consumer_offsets执行compact操作,只保留最新的一次提交offset。

group coordinator运行在kafka某个broker上,负责consumer group内所有的consumer成员管理、所有的消费的topic的partition的消费关系分配、offset管理、触发rebalance等功能。group coordinator管理partition分配时,会指定consumer group内某个consumer作为group leader执行具体的partition分配任务。存储某个consumer group已提交offset的__consumer_offsets partition leader副本所在的broker就是该consumer group的协调器运行的broker。

跟大多数分布式系统一样,集群有一个master角色管理整个集群,协调集群中各个成员的行为。kafka集群中的controller就相当于其它分布式系统的master,用来负责集群topic的分区分配,分区leader选举以及维护集群的所有partition的ISR等集群协调功能。集群中哪个borker是controller也是通过一致性协议选举产生的,2.8版本之前通腔销过zookeeper进行选主,2.8版本后通过kafka raft协议进行选举。如果controller崩溃,集群会重新选举一个broker作为新的controller,并增加controller epoch值(相当于zookeeper ZAB协议的epoch,raft协议的term值)

当kafka集群新建了topic或为一个topic新增了partition,controller需要为这些新增加的partition分配到具体的broker上,并把分配结果记录下来,供producer和consumer查询获取。

因为只有partition的leader副本才会处理producer和consumer的读写请求,而partition的其他follower副本需要从相应的leader副本同步消息,为了尽量保证集群中所有broker的负载是均衡的,controller在进行集群全局partition副本伍羡游分配时需要使partition的分布情况是如下这样的:

在默认情况下,kafka采用轮询(round-robin)的方式分配partition副本。由于partition leader副本承担的流量比follower副本大,kafka会先分配所有topic的partition leader副本,使所有partition leader副本全局尽量平衡,然后再分配各个partition的follower副本。partition第一个follower副本的位置是相应leader副本的下一个可用broker,后面的副本位置依此类推。

举例来说,假设我们有两个topic,每个topic有两个partition,每个partition有两个副本,这些副本分别标记为1-1-1,1-1-2,1-2-1,1-2-2,2-1-1,2-1-2,2-2-1,2-2-2(编码格式为topic-partition-replia,编号均从1开始,第一个replica是leader replica,其他的是follower replica)。共有四个broker,编号是1-4。我们先对broker按broker id进行排序,然后分配leader副本,最后分配foller副本。

1)没有配置broker.rack的情况

现将副本1-1-1分配到broker 1,然后1-2-1分配到broker 2,依此类推,2-2-1会分配到broker 4。partition 1-1的leader副本分配在broker 1上,那么下一个可用节点是broker 2,所以将副本1-1-2分配到broker 2上。同理,partition 1-2的leader副本分配在broker 2上,那么下一个可用节点是broker 3,所以将副本1-1-2分配到broker 3上。依此类推分配其他的副本分片。最后分配的结果如下图所示:

2)配置了broker.rack的情况

假设配置了两个rack,broker 1和broker 2属于Rack 1,broker 3和broker 4属于Rack 2。我们对rack和rack内的broker分别排序。然后先将副本1-1-1分配到Rack 1的broker 1,然后将副本1-2-1分配到下一个Rack的第一个broker,即Rack 2的broker 3。其他的parttition leader副本依此类推。然后分配follower副本,partition 1-1的leader副本1-1-1分配在Rack 1的broker上,下一个可用的broker是Rack 2的broker 3,所以分配到broker 3上,其他依此类推。最后分配的结果如下图所示:

kafka除了按照集群情况自动分配副本,也提供了reassign工具人工分配和迁移副本到指定broker,这样用户可以根据集群实际的状态和各partition的流量情况分配副本

kafka集群controller的一项功能是在partition的副本中选择一个副本作为leader副本。在topic的partition创建时,controller首先分配的副本就是leader副本,这个副本又叫做preference leader副本。

当leader副本所在broker失效时(宕机或网络分区等),controller需要为在该broker上的有leader副本的所有partition重新选择一个leader,选择方法就是在该partition的ISR中选择第一个副本作为新的leader副本。但是,如果ISR成员只有一个,就是失效的leader自身,其余的副本都落后于leader怎么办?kafka提供了一个unclean.leader.election配置参数,它的默认值为true。当unclean.leader.election值为true时,controller还是会在非ISR副本中选择一个作为leader,但是这时候使用者需要承担数据丢失和数据不一致的风险。当unclean.leader.election值为false时,则不会选择新的leader,该partition处于不可用状态,只能恢复失效的leader使partition重新变为可用。

当preference leader失效后,controller重新选择一个新的leader,但是preference leader又恢复了,而且同步上了新的leader,是ISR的成员,这时候preference leader仍然会成为实际的leader,原先的新leader变为follower。因为在partition leader初始分配时,使按照集群副本均衡规则进行分配的,这样做可以让集群尽量保持平衡。

为了保证topic的高可用,topic的partition往往有多个副本,所有的follower副本像普通的consumer一样不断地从相应的leader副本pull消息。每个partition的leader副本会维护一个ISR列表存储到集群信息库里,follower副本成为ISR成员或者说与leader是同步的,需要满足以下条件:

1)follower副本处于活跃状态,与zookeeper(2.8之前版本)或kafka raft master之间的心跳正常

2)follower副本最近replica.lag.time.max.ms(默认是10秒)时间内从leader同步过最新消息。需要注意的是,一定要拉取到最新消息,如果最近replica.lag.time.max.ms时间内拉取过消息,但不是最新的,比如落后follower在追赶leader过程中,也不会成为ISR。

follower在同步leader过程中,follower和leader都会维护几个参数,来表示他们之间的同步情况。leader和follower都会为自己的消息队列维护LEO(Last End Offset)和HW(High Watermark)。leader还会为每一个follower维护一个LEO。LEO表示leader或follower队列写入的最后一条消息的offset。HW表示的offset对应的消息写入了所有的ISR。当leader发现所有follower的LEO的最小值大于HW时,则会增加HW值到这个最小值LEO。follower拉取leader的消息时,同时能获取到leader维护的HW值,如果follower发现自己维护的HW值小于leader发送过来的HW值,也会增加本地的HW值到leader的HW值。这样我们可以得到一个不等式: follower HW = leader HW = follower LEO = leader LEO 。HW对应的log又叫做committed log,consumer消费partititon的消息时,只能消费到offset值小于或等于HW值的消息的,由于这个原因,kafka系统又称为分布式committed log消息系统。

kafka的消息内容存储在log.dirs参数配置的目录下。kafka每个partition的数据存放在本地磁盘log.dirs目录下的一个单独的目录下,目录命名规范为 ${topicName}-${partitionId} ,每个partition由多个LogSegment组成,每个LogSegment由一个数据文件(命名规范为: {baseOffset}.index)和一个时间戳索引文件(命名规范为:${baseOffset}.timeindex)组成,文件名的baseOffset就是相应LogSegment中第一条消息的offset。.index文件存储的是消息的offset到该消息在相应.log文件中的偏移,便于快速在.log文件中快速找到指定offset的消息。.index是一个稀疏索引,每隔一定间隔大小的offset才会建立相应的索引(比如每间隔10条消息建立一个索引)。.timeindex也是一个稀疏索引文件,这样可以根据消息的时间找到对应的消息。

可以考虑将消息日志存放到多个磁盘中,这样多个磁盘可以并发访问,增加消息读写的吞吐量。这种情况下,log.dirs配置的是一个目录列表,kafka会根据每个目录下partition的数量,将新分配的partition放到partition数最少的目录下。如果我们新增了一个磁盘,你会发现新分配的partition都出现在新增的磁盘上。

kafka提供了两个参数log.segment.bytes和log.segment.ms来控制LogSegment文件的大小。log.segment.bytes默认值是1GB,当LogSegment大小达到log.segment.bytes规定的阈值时,kafka会关闭当前LogSegment,生成一个新的LogSegment供消息写入,当前供消息写入的LogSegment称为活跃(Active)LogSegment。log.segment.ms表示最大多长时间会生成一个新的LogSegment,log.segment.ms没有默认值。当这两个参数都配置了值,kafka看哪个阈值先达到,触发生成新的LogSegment。

kafka还提供了log.retention.ms和log.retention.bytes两个参数来控制消息的保留时间。当消息的时间超过了log.retention.ms配置的阈值(默认是168小时,也就是一周),则会被认为是过期的,会被kafka自动删除。或者是partition的总的消息大小超过了log.retention.bytes配置的阈值时,最老的消息也会被kafka自动删除,使相应partition保留的总消息大小维持在log.retention.bytes阈值以下。这个地方需要注意的是,kafka并不是以消息为粒度进行删除的,而是以LogSegment为粒度删除的。也就是说,只有当一个LogSegment的最后一条消息的时间超过log.retention.ms阈值时,该LogSegment才会被删除。这两个参数都配置了值时,也是只要有一个先达到阈值,就会执行相应的删除策略

当我们使用KafkaProducer向kafka发送消息时非常简单,只要构造一个包含消息key、value、接收topic信息的ProducerRecord对象就可以通过KafkaProducer的send()向kafka发送消息了,而且是线程安全的。KafkaProducer支持通过三种消息发送方式

KafkaProducer客户端虽然使用简单,但是一条消息从客户端到topic partition的日志文件,中间需要经历许多的处理过程。KafkaProducer的内部结构如下所示:

从图中可以看出,消息的发送涉及两类线程,一类是调用KafkaProducer.send()方法的应用程序线程,因为KafkaProducer.send()是多线程安全的,所以这样的线程可以有多个;另一类是与kafka集群通信,实际将消息发送给kafka集群的Sender线程,当我们创建一个KafkaProducer实例时,会创建一个Sender线程,通过该KafkaProducer实例发送的所有消息最终通过该Sender线程发送出去。RecordAccumulator则是一个消息队列,是应用程序线程与Sender线程之间消息传递的桥梁。当我们调用KafkaProducer.send()方法时,消息并没有直接发送出去,只是写入了RecordAccumulator中相应的队列中,最终需要Sender线程在适当的时机将消息从RecordAccumulator队列取出来发送给kafka集群。

消息的发送过程如下:

在使用KafkaConsumer实例消费kafka消息时,有一个特性我们要特别注意,就是KafkaConsumer不是多线程安全的,KafkaConsumer方法都在调用KafkaConsumer的应用程序线程中运行(除了consumer向kafka集群发送的心跳,心跳在一个专门的单独线程中发送),所以我们调用KafkaConsumer的所有方法均需要保证在同一个线程中调用,除了KafkaConsumer.wakeup()方法,它设计用来通过其它线程向consumer线程发送信号,从而终止consumer执行。

跟producer一样,consumer要与kafka集群通信,消费kafka消息,首先需要获取消费的topic partition leader replica所在的broker地址等信息,这些信息可以通过向kafka集群任意broker发送Metadata请求消息获取。

我们知道,一个consumer group有多个consumer,一个topic有多个partition,而且topic的partition在同一时刻只能被consumer group内的一个consumer消费,那么consumer在消费partition消息前需要先确定消费topic的哪个partition。partition的分配通过group coordinator来实现。基本过程如下:

我们可以通过实现接口org.apache.kafka.clients.consumer.internals.PartitionAssignor自定义partition分配策略,但是kafka已经提供了三种分配策略可以直接使用。

partition分配完后,每个consumer知道了自己消费的topic partition,通过metadata请求可以获取相应partition的leader副本所在的broker信息,然后就可以向broker poll消息了。但是consumer从哪个offset开始poll消息?所以consumer在第一次向broker发送FetchRequest poll消息之前需要向Group Coordinator发送OffsetFetchRequest获取消费消息的起始位置。Group Coordinator会通过key {topic}-${partition}查询 __consumer_offsets topic中是否有offset的有效记录,如果存在,则将consumer所属consumer group最近已提交的offset返回给consumer。如果没有(可能是该partition是第一次分配给该consumer group消费,也可能是该partition长时间没有被该consumer group消费),则根据consumer配置参数auto.offset.reset值确定consumer消费的其实offset。如果auto.offset.reset值为latest,表示从partition的末尾开始消费,如果值为earliest,则从partition的起始位置开始消费。当然,consumer也可以随时通过KafkaConsumer.seek()方法人工设置消费的起始offset。

kafka broker在收到FetchRequest请求后,会使用请求中topic partition的offset查一个skiplist表(该表的节点key值是该partition每个LogSegment中第一条消息的offset值)确定消息所属的LogSegment,然后继续查LogSegment的稀疏索引表(存储在.index文件中),确定offset对应的消息在LogSegment文件中的位置。为了提升消息消费的效率,consumer通过参数fetch.min.bytes和max.partition.fetch.bytes告诉broker每次拉取的消息总的最小值和每个partition的最大值(consumer一次会拉取多个partition的消息)。当kafka中消息较少时,为了让broker及时将消息返回给consumer,consumer通过参数fetch.max.wait.ms告诉broker即使消息大小没有达到fetch.min.bytes值,在收到请求后最多等待fetch.max.wait.ms时间后,也将当前消息返回给consumer。fetch.min.bytes默认值为1MB,待fetch.max.wait.ms默认值为500ms。

为了提升消息的传输效率,kafka采用零拷贝技术让内核通过DMA把磁盘中的消息读出来直接发送到网络上。因为kafka写入消息时将消息写入内存中就返回了,如果consumer跟上了producer的写入速度,拉取消息时不需要读磁盘,直接从内存获取消息发送出去就可以了。

为了避免发生再平衡后,consumer重复拉取消息,consumer需要将已经消费完的消息的offset提交给group coordinator。这样发生再平衡后,consumer可以从上次已提交offset出继续拉取消息。

kafka提供了多种offset提交方式

partition offset提交和管理对kafka消息系统效率来说非常关键,它直接影响了再平衡后consumer是否会重复拉取消息以及重复拉取消息的数量。如果offset提交的比较频繁,会增加consumer和kafka broker的消息处理负载,降低消息处理效率;如果offset提交的间隔比较大,再平衡后重复拉取的消息就会比较多。还有比较重要的一点是,kafka只是简单的记录每次提交的offset值,把最后一次提交的offset值作为最新的已提交offset值,作为再平衡后消息的起始offset,而什么时候提交offset,每次提交的offset值具体是多少,kafka几乎不关心(这个offset对应的消息应该存储在kafka中,否则是无效的offset),所以应用程序可以先提交3000,然后提交2000,再平衡后从2000处开始消费,决定权完全在consumer这边。

kafka中的topic partition与consumer group中的consumer的消费关系其实是一种配对关系,当配对双方发生了变化时,kafka会进行再平衡,也就是重新确定这种配对关系,以提升系统效率、高可用性和伸缩性。当然,再平衡也会带来一些负面效果,比如在再平衡期间,consumer不能消费kafka消息,相当于这段时间内系统是不可用的。再平衡后,往往会出现消息的重复拉取和消费的现象。

触发再平衡的条件包括:

需要注意的是,kafka集群broker的增减或者topic partition leader重新选主这类集群状态的变化并不会触发在平衡

有两种情况与日常应用开发比较关系比较密切:

consumer在调用subscribe()方法时,支持传入一个ConsumerRebalanceListener监听器,ConsumerRebalanceListener提供了两个方法,onPartitionRevoked()方法在consumer停止消费之后,再平衡开始之前被执行。可以发现,这个地方是提交offset的好时机。onPartitonAssigned()方法则会在重新进行partition分配好了之后,但是新的consumer还未消费之前被执行。

我们在提到kafka时,首先想到的是它的吞吐量非常大,这也是很多人选择kafka作为消息传输组件的重要原因。

以下是保证kafka吞吐量大的一些设计考虑:

但是kafka是不是总是这么快?我们同时需要看到kafka为了追求快舍弃了一些特性:

所以,kafka在消息独立、允许少量消息丢失或重复、不关心消息顺序的场景下可以保证非常高的吞吐量,但是在需要考虑消息事务、严格保证消息顺序等场景下producer和consumer端需要进行复杂的考虑和处理,可能会比较大的降低kafka的吞吐量,例如对可靠性和保序要求比较高的控制类消息需要非常谨慎的权衡是否适合使用kafka。

我们通过producer向kafka集群发送消息,总是期望消息能被consumer成功消费到。最不能忍的是producer收到了kafka集群消息写入的正常响应,但是consumer仍然没有消费到消息。

kafka提供了一些机制来保证消息的可靠传递,但是有一些因素需要仔细权衡考虑,这些因素往往会影响kafka的吞吐量,需要在可靠性与吞吐量之间求得平衡:

kafka只保证partition消息顺序,不保证topic级别的顺序,而且保证的是partition写入顺序与读取顺序一致,不是业务端到端的保序。

如果对保序要求比较高,topic需要只设置一个partition。这时可以把参数max.in.flight.requests.per.connection设置为1,而retries设置为大于1的数。这样即使发生了可恢复型错误,仍然能保证消息顺序,但是如果发生不可恢复错误,应用层进行重试的话,就无法保序了。也可以采用同步发送的方式,但是这样也极大的降低了吞吐量。如果消息携带了表示顺序的字段,可以在接收端对消息进行重新排序以保证最终的有序。

Kafka设计解析(三)恰好一次和事务消息

为了解决重试导致的消息重复、乱序问题,kafka引入了幂悔首等消息。幂等消息保证producer在一次会话内写入一个partition内的消息具有幂等性,可以通过重试来确保消息发布的Exactly Once语义。

实现逻辑很简单:

producer每次启动后,首先向broker申请一个全局唯一的pid,用来标识本次会话。

message_v2 增加了sequence number字段,producer每发一批消息,seq就加1。

broker在内存维护(pid,seq)映射,收到消息后检查seq,如果,

producer在收到明确的的消息丢失ack,或者超时后未收到ack,要进行重试。

考虑在stream处理的场景中,需要多个消息的原子写入语义,要么全部写入成功,要么全部失败,这就是kafka事务消息要解决的问题。

事务消息是由producer、事务协调器、broker、组协调器、consumer共同参与实现的,

为producer指定固定的TransactionalId,可以穿越producer的多次会话(producer重启/断线重连)中,持续标识producer的身份。

使用epoch标识producer的每一次"重生",防止同一producer存在多个会话。

producer遵从幂等消息的行为,并在发送的recordbatch中增加事务id和epoch。

引入事务协调器,以两阶段提交的方式,实现消息的事务提交。

事务协调器使用一个特殊的topic:transaction,来做事务提交日志。

事务控制器通过RPC调研,协调 broker 和 consumer coordinator 实现事务的两阶段提交。

每一个broker都会启动一个事务协调器,使用hash(TransactionalId)确定producer对应的事务协调器,使得整个集群的负载均衡。

broker处理在事务协调器的commit/abort控制消息,把控制消息向正常消息一样写入topic(和正常消息交织在一起,用来确认事务提交的日志偏移),并向前推进消息提交偏移hw。

如果在事务过程中,提交了消费偏移,组协调器在offset log中写入事务消费偏移。当事务提交时,在offset log中写入事务offset确认消息。

consumer过滤未提交消息和事务控颤液制消息,使这些消息对用户不可见。

有两种实现方式,

设置isolation.level=read_uncommitted,此时topic的所有消息对consumer都可见。

consumer缓存这些消息,直到收到事务控制消息。若事务commit,则对外发布这些消息;若事务abort,则丢弃这些消息。

设置isolation.level=read_committed,此时topic中未提交的消息对consumer不可见,只有在事务结束后,消息才对consumer可见。

broker给consumer的BatchRecord消息中,会包含以列表,指明哪些是"abort"事务,consumer丢弃abort事务的消息即可。

事务消息处理流程如图1所示,

图1 事务消息业务流程

流程说明:

事务协调器是分配pid和管理事务的核心,produer首先对任何一个broker发送茄前物FindCoordinatorRequest,发现自己的事务协调器。

紧接着,producer向事务协调器发送InitPidRequest,申请生成pid。

2a.当指定了transactional.id时,事务协调器为producer分区pid,并更新epoch,把(tid,pid)的映射关系写入事务日志。 同时清理tid任何未完成的事务,丢弃未提交的消息。

启动事务是producer的本地操作,促使producer更新内部状态,不会和事务协调器发生关系。

事务协调器自动启动事务,始终处在一个接一个的事务处理状态机中。

对于每一个要在事务中写消息的topic分区,producer应当在第一次发消息前,向事务处理器注册分区。

4.1a.事务处理器把事务关联的分区写入事务日志。

在提交或终止事务时,事务协调器需要这些信息,控制事务涉及的所有分区leader完成事务提交或终止。

4.2a. producer向分区leader写消息,消息中包含tid,pid,epoch和seq。

4.3. 提交消费偏移 -- AddOffsetCommitsToTxnRequest

4.3a. producer向事务协调器发送消费偏移,事务协调器在事务日志中记录偏移信息,并把组协调器返回给producer。

4.4a. producer向组协调器发送TxnOffsetCommitRequest,组协调器把偏移信息写入偏移日志。但是,要一直等到事务提交后,这个偏移才生效,对外部可见。

收到提交或终止事务的请求时,事务处理器执行下面的操作:

1. 在事务日志中写入PREPARE_COMMIT或PREPARE_ABORT消息(5.1a)。

2. 通过WriteTxnMarkerRequest向事务中的所有broker发事务控制消息(5.2)。

3. 在事务之日中写入COMMITTED或ABORTED消息(5.3)。

这个消息由事务处理器发给事务中所涉及分区的leader。

当收到这个消息后,broker会在分区log中写入一个COMMIT或ABORT控制消息。同时,也会更新该分区的事务提交偏移hw。

如果事务中有提交消费偏移, broker也会把控制消息写入 __consumer-offsets log,并通知组协调器使事务中提交的消费偏移生效。

当所有的commit或abort消息写入数据日志,事务协调器在事务日志中写入事务日志,标志这事务结束。

至此,本事务的所有状态信息都可以被删除,可以开始一个新的事务。

在实现上,还有很多细节,比如,事务协调器会启动定时器,用来检测并终止开始后长时间不活动的事务,具体请参考下面列出的kafka社区技术文档。

【总结】:

我们要认识到,虽然kafka事务消息提供了多个消息原子写的保证,但它不保证原子读。

例如,

也就是说,虽然kafka log持久化了数据,也可以通过指定offset多次消费数据,但由于分区数据之间的无序性,导致每次处理输出的结果都是不同的。这使得kafka stream不能像hadoop批处理任务一样,可以随时重新执行,保证每次执行的结果相同。除非我们只从一个topic分区读数据。

[0]

[1]

[2]

[3]

[4]

[5]

[img]

canal + kafka 事务一致性问题

将配置进行更改: canal.mq.flatMessage = false(消息以PB的格式保存),一个事务会保存在一个消息体里,问题解洞芹决!(但瞎晌没明白事务一致性,为什么会受数据存储格式的影响) 【已在官网发了个issue 】

将配置更改纳神毕为:

kafka batch.size: the size of a micro-batch of the producer, the default is 16K

kafka.batch.size = 32768 (不要超过1M, kafka.max.request.size)

canal.mq.canalBatchSize: get the batch size of binlog data, the default is 50K

canal.mq.canalBatchSize = 50

一个事务会保存在一个消息体里,问题解决!

Kafka 是如何实现事务的

Kafka 是一个高度可扩展的分布式消息系统,在海量数据处理生态中占据着重要的地位。

数据处理的一个关键特性是数据的一致性。具体到 Kafka 的领域中,也就是生产者生产的数据和消费者消费的数据之间一对一的一致性。在各种类型的失败普遍存在的分布式系统环境下,保证业务层面一个整体的消息集合被原子的发布和恰好一次处理,是数据一致性在 Kafka 生态系统的实际要求。

本文介绍了 Kafka 生态中的事务机制的概念和流程。

Kafka 事务机制的概念

Kafka 从 0.11 版本开始支持了事务机制。Kafka 事务机制支持了跨分区的消息原子写功能。具体来说,Kafka 生产者在同一个事务内提交到多个分区的消息,要么同时成功,要么同时失败。这一保证在生产者运行时出现异常甚至宕机重启之后仍然成立。

此外,同一个事务内的消息将以生产者发送的顺序,唯一地提交到 Kafka 集群上。也就是说,事务机制从某种层面上保证了消息被恰好一次地提交到 Kafka 集群。众所周知,恰好一次送达在分布式系统中是不可能实现的。这个论断有一些微妙的名词重载问题,但大抵没错,所有声称能够做到恰好一次处理桐罩碰的系统都在某个地方依赖了幂等性。

Kafka 的事务机制被广泛用于现实世界中复杂业务需要保证一个业务领域中原子的概念被原子地提交的场景。

例如,一次下单流水包括订单生成消息和库存扣减消息,如果这两个消息在历史上由两个主题分管,那么它们在业务上的原子性就要求 Kafka 要利用事务机制原子地提交到 Kafka 集群上。

还有,对于复杂的流式处理系统,Kafka 生产者的上游可能是另一个流式处理系统,这个系统可能有着自己的一致性方案。为了跟上游系统的一致性方案协调,Kafka 就需要提供一个尽可能通用且易于组合的一致性机制,即灵活的事务机制,来帮助实现端到端的一致性。

Kafka 事务机制的流程

分布式系统的数据一致性是难的。要想理解一个系统提供何种程度的数据一致性保证,以及这样的保证对应用程序提出了什么样的要求闷姿,再及在哪些情况下一致性保证会出现什么方面的回退,细究其一致性机制的实现是必须的。

上面我们提到,事务机制的核心特征是能跨越多个分区原子地提交消息集合,甚至这些分区从属于不同的主题。同时,被提交的消息集合中的消息每条仅被提交一次,并保持它们在生产者应用中被生产的顺序写入到 Kafka 集群的消息日志中。此外,事务能够容忍生产者运行时出现异常甚至宕机重启。

实现事务机制最关键的概念就是事务的唯一标识符( TransactionalID ),Kafka 使用 TransactionalID 来关联进行中的事务。TransactionalID 由用户提供,这是因为 Kafka 作为系统本身无法独立的识别出宕机前后的两个不同的进程其实是要同一个逻辑上的事务。

对于同一个生产者应用前后进行的多个事务,TransactionalID 并不需要每次都生成一个新的。这是因为 Kafka 还实现了 ProducerID 以及 epoch 机制。这个机制在事务机制中的用途主要是用于标识不同的会话,同一个会话 ProducerID 的值相同,但有可能有多个任期。ProducerID 仅在会话切换时改变,而任期会在每次新的事物初始化时被更新。这样,同一个 TransactionalID 就能作为跨会话的多个独立事务的标识。

接下来,我们从一个事务的完整流程出发讨论客户端也就是生产者和消费者,以及服务端也就是 Kafka 集群在这个流程中扮演了什么角色,执行了什么动作。

初始化事务上下文

逻辑上说,事务总是从生产者提起的。生产者通过调用 initTransactions 方法初始化事务上下文。首要做的事情就是找到 Kafka 集群负责管理当前事务的事务协调者( TransactionCoordinator ),向其申请 ProducerID 资源。初始的 ProducerID 及 epoch 都是未初始化的状态。

生产者一侧的事务管理者( TransactionManager )收到相应的方法调用之后先后发送查找事务协调者局谈的信息和初始化 ProducerID 的信息。事务相关的所有元数据信息都会由客户端即生产者一侧的事务管理者和服务端即 Kafka 集群的一个 Broker 上的事务协调者交互完成。

一开始,生产者并不知道哪个 Broker 上有自己 TransactionalID 关联的事务协调者。逻辑上,所有事务相关的需要持久化的数据最终都会写到一个特殊的主题 __transaction_state 上。这跟前面回答消费位点管理文章中的管理消费者消费位点的特殊主题 __consumer_offsets 构成了目前 Kafka 系统里唯二的特殊主题。

对于一个生产者或者说被 TransactionalID 唯一标识的事务来说,它的事务协调者就是该事务的元数据最终存储在 __transaction_state 主题上对应分区的分区首领。对于一个具体的事务来说,它的元数据将被其 TransactionalID 的哈希值的绝对值模分区数的分区所记录,这也是常见的确定分区的方案。

生产者将查找事务协调者的信息发送到集群的任意一个 Broker 上,由它计算出实际的事务协调者,获取对应的节点信息后返回给生产者。这样,生产者就找到了事务协调者。

随后,生产者会向事务协调者申请一个 ProducerID 资源,这个资源包括 ProducerID 和对应的 epoch 信息。事务协调者收到对应请求后,将会首先判断同一个 TransactionalID 下的事务的状态,以应对好跨会话的事务的管理。

第一步,事务协调者会获取 TransactionalID 对应的事务元数据信息。前面提到,这些元数据信息将被写在特殊主题 __transaction_state 上,这也是事务元数据信息对生产者和 Kafka 集群都容错的需要。

如果获取不到元数据信息,那么就初始化事务元数据信息,包括从获取一个新的 ProducerID 资源,并将它和 TransactionalID 以及分区编号和其他一些配置信息一起打包持久化。

其中,获取一个新的 ProducerID 资源需要 ProducerID 管理器从 ZooKeeper 上申请一个 ProducerID 的号段,在逐一的分配出去。申请号段的手段是修改 ZooKeeper 上 /latest_producer_id_block 节点的信息,流程是读节点上最后一个被申请的 ProducerID 的信息,加上要申请的号段的长度,再更新节点上最后一个被申请的 ProducerID 的信息。由于 ZooKeeper 对节点的更新有版本控制,因此并发的请求将会导致其中若干个请求目标版本失配,并提起重试。ProducerID 的长度是 Long 类型的长度,因此在实际使用过程中几乎不可能用完,Kafka 对号段资源耗尽的情况抛出致命错误并不尝试恢复。

如果获取到了相同 TransactionalID 先前的元数据信息,那么根据事务协调器事务先前的状态采取不同的行为。

如果此时状态转移正在进行,直接返回 CONCURRENT_TRANSACTIONS 异常。注意这里是事务协调器上正在发生并发的状态转移。通常来说,并发的状态转移应该依次执行,直接返回此异常可避免客户端即生产者请求超时,而是让生产者稍后自行重试。这也是一种乐观的加锁策略。

如果此时状态为 PrepareAbort 或 PrepareCommit 则返回 CONCURRENT_TRANSACTIONS 异常。同样的,此时状态即将转换为终结状态,无需强行终止先前的事务,否则将会产生无谓的浪费。

如果此时状态为 Dead 或 PrepareEpochFence 或当前 ProducerID 和 epoch 对不上,直接抛出不可重试的异常。这是由于要么是先前的 Producer 且已经被新的 Producer 替代,要么事务已经超时,无需再次尝试。

如果此时状态为 Ongoing 则事务协调者会将事务转移到 PrepareEpochFence 状态,然后再丢弃当前的事务,并返回 CONCURRENT_TRANSACTIONS 异常。

如果此时状态为 CompleteAbort 或 CompleteCommit 或 Empty 之一那么先将状态转移为 Empty 然后更新 epoch 值。

经过这么一连环的操作,Kafka 就将事务执行的上下文初始化好了。

开始一个事务

初始化事务的流程实际上是生产者和对应的事务协调者就事务状态达成一致,进入到一个可以提起新的事务的状态。此时,生产者可以通过 beginTransaction 方法开始一个事务操作。这个方法只会将本地事务状态转移到 IN_TRANSACTION 状态,在真正的提交事务中的消息之前,不会有跟 Kafka 集群的交互。

生产者将自己标记为开始事务之后,也就是本地事务状态转移到事务进行中的状态之后,就可以开始发送事务中的消息了。

发送事务中的消息

生产者在发送事务中的消息的时候,会将消息对应的分区添加到事务管理器中去,如果这个分区此前没被添加过,那么事务管理器会在下一次发送消息之前插入一条 AddPartitionsToTxnRequest 请求来告诉 Kafka 集群的事务协调者参与事务的分区的信息。事务协调者收到这条信息之后,将会更新事务的元数据,并将元数据持久化到 __transaction_state 中。

对于生产者发送的消息,仍然和一般的消息生产一样采用 ProduceRequest 请求。除了会在请求中带上相应的 TransactionalID 信息和属于事务中的消息的标识符,它跟生产者生产的普通信息别无二致。如果消费者没有配置读已提交的隔离级别,那么这些消息在被 Kafka 集群接受并持久化到主题分区中时,就已经对消费者可见而且可以被消费了。

事务中的消息的顺序性保证也是在发送事务的时候检查的。

生产者此时已经申请到了一个 ProducerID 资源,当它向一个分区发送消息时,内部会有一个消息管理器为每个不同的分区维护一个顺序编号( SequenceNumber )。相应地,Kafka 集群也会为每个 ProducerID 到每个分区的消息生产维护一个顺序编号。

ProducerRequest 请求中包含了顺序编号信息。如果 Kafka 集群看到请求的顺序编号跟自己的顺序编号是连续的,即比自己的顺序编号恰好大一,那么接受这条消息。否则,如果请求的顺序编号大一以上,则说明是一个乱序的消息,直接拒绝并抛出异常。如果请求的顺序编号相同或更小,则说明是一个重复发送的消息,直接忽略并告诉客户端是一个重复消息。

提交事务

在一个事务相关的所有消息都发送完毕之后,生产者就可以调用 commitTransaction 方法来提交整个事务了。对于事务中途发生异常的情形,也可以通过调用 abortTransaction 来丢弃整个事务。这两个操作都是将事务状态转移到终结状态,彼此之间有许多相似点。

无论是提交还是丢弃,生产者都是给事务协调者发送 EndTxnRequest 请求,请求中包含一个字段来判断是提交还是丢弃。事务协调者在收到这个请求后,首先更新事务状态到 PrepareAbort 或 PrepareCommit 并更新状态到 __transaction_state 中。

如果在状态更新成功前事务协调者宕机,那么恢复过来的事务协调者将认为事务在 Ongoing 状态中,此时生产者由于收不到确认回复,会重试 EndTxnRequest 请求,并最终更新事务到 PrepareAbort 或 PrepareCommit 状态。

随后,根据是提交还是丢弃,分别向事务涉及到的所有分区的分区首领发送事务标志( TransactionMarker )。

事务标志是 Kafka 事务机制引入的不同于业务消息的事务控制消息。它的作用主要是标识事务已经完成,这个消息同业务消息一样能够被消费者所消费,并且它和事务中的业务消息能够通过 TransactionalID 关联起来,从而支持配置了读已提交特性的消费者忽略尚未提交的事务消息或被丢弃的事务消息。

如果在事务标志写到涉及到的所有分区的分区首领之前,事务协调者宕机或者分区首领宕机或网络分区,新起来的事务协调者或超时后重试的事务协调者会重新向分区首领写入事务标志。事务标志是幂等的,因此不会影响事务提交的结果。这里我们印证了之前所说的所有声称能够做到恰好一次处理的系统都在某个地方依赖了幂等性。

在当前事务涉及到的所有分区都已经把事务标志信息持久化到主题分区之后,事务协调者才会将这个事务的状态置为提交或丢弃,并持久化到事务日志文件中。在这之后,一个 Kafka 事务才算真正的完成了。事务协调者中缓存的关于当前事务的元数据就可以清理了。

如果在事务协调者回复生产者提交成功之前宕机,在恢复之后生产者再次提交事务时会直接返回事务提交成功。

总的来说,事务的状态以 __transaction_state 主题上持久化的元数据信息为准。

超时过期事务

分布式系统由于天然的网络阻塞或分区等失败原因,操作在成功和失败之外还有超时这第三种状态。现实中的分布式系统必须合理地处理超时的状态,否则永久阻塞或等待在任何实际的业务领域中都是不可接受的。

Kafka 事务机制本身可以配置事务超时,在事务管理者和事务协调者交互的各个过程中都会检验事务超时的配置,如果事务已经超时则抛出异常。

但是,在网络分区的情况下,可能 Kafka 集群根本就等不到生产者发送的消息。这个时候,Kafka 集群就需要相应的机制来主动过期。否则永不过期的中间状态事务在生产者宕机且不可恢复或不再恢复的情况下将逐步积累成存储垃圾。

Kafka 集群会周期性的轮询内存中的事务信息。如果发现进行中的事务最后的状态更新时间距今已经超过了配置的集群事务清理时间阈值,则采取丢弃该事务的操作。同时,为了避免操作过程中并发地收到原 Producer 发来事务更新请求,首先更新事务关联的 ProducerID 的 epoch 以将原 Producer 的 epoch 隔离掉。换个角度说,也就是以一个新的有效的身份执行丢弃事务操作,以免分不清到底是谁在丢弃事务。

此外,轮询中还会检查 TransactionalID 最新的事务信息,如果一个 TransactionalID 最后一个事务距今已经已经超过了配置的集群 TransactionalID 清理时间阈值,则将该 TransactionalID 对应的元数据信息都进行清理。

上面的讨论中还有两个重要的主题被忽略了。一个是 Kafka 事务机制支持在同一个事务里进行消息生产和消息消费位点提交,另一个是配置了读已提交的消费者如何在事务未提交以及丢弃事务时正确的读取事务中消息。

前者不是特别复杂,只需要将消费位点提交视作一条事务中的消息,和消息生产以及控制消息同等待遇,在提交的时候也被事务标志所界定即可。

不展开聊是因为这个特性通常只在仅适用 Kafka 搭建流式处理流水线的场景下有用,尤其是 Kafka Streams 解决方案。

对于组合多个系统的流式处理流水线来说,消息从 Kafka 中消费得到是上游,生产到 Kafka 上是下游,中间是另一个例如 Flink 的流式计算系统。在这种场景下,消费位点的管理和事务地生产消息是两个可以分开考虑的事情,可以跟其他系统的一致性方案例如 Flink 的 Checkpoint 机制相结合,而不需要非得在同一个事务里既提交消费位点,又提交新的消息。

后者主要靠 Kafka 集群在管理消费位点拉取请求的时候,通过随事务机制的引入新添加的 LastStableOffset 概念来响应配置为读已提交的消费者的请求。在事务完成之前不会允许读已提交的消费者拉取事务中的消息。显然,这有可能导致消费者拉取新消息时长时间的阻塞。因此在实践中应当尽量避免长时间的事务。

对于丢弃事务的消息,Kafka 集群会维护一个丢弃事务的消息的元数据,从而支持消费者同时拉取消息和丢弃事务的消息的元数据,自行比对筛掉丢弃事务的消息。在正常的业务场景里,丢弃的事务不会太多,从而维护这样的一份元数据以及让消费者自行筛选会是一个能够接受的选择。

一探究竟,详解Kafka生产者和消费者的工作原理!

对于每个主题,Kafka群集都会维护一个分区日志,如下所示:

每个分区(Partition)都是有序的(所以每一个Partition内部都是有序的),不变的记录序列,这些记录连续地附加到结构化的提交日志中。分区中的每个记录均分配有一个称为偏移的顺序ID号,该ID 唯一地标识分区中的每个记录。

每个消费者保留的唯一元数据是该消费者在日志中的偏移量或位置。此偏移量由使用者控制:通常,使胡肆改用者在读取记录时会线性地推进其偏移量,但实际上,由于位置是由使用者控制的,因此它可以按喜欢的任何顺序使用记录。例如,使用者可以重置到较旧的偏移量以重新处理过去的数据,或者跳到最近的记录并从“现在”开始使用。(类似于游标指针的方式顺序处理数据,并且该指标可以任意移动)

分区的设计结构

生产者分区策略是 决定生产者将消息发送到哪个分区的算法, 主要有以下几种:

kafka消息的有序性,是采用消息键保序策略来实现的。 一个topic,一个partition(分割),一个consumer,内部单线程消费,写N个内存queue,然后N个线程分别消费一个内存queue。

kafka发送进行消息压缩有两个地方,分别是生产端压缩和Broker端压缩。

生产者端压缩 生产者压缩通常采用的GZIP算法这样 Producer 启动后生产的每个消息集合都是经 GZIP 压缩过的,故而能很好地节省网络传输带宽以及 Kafka Broker 端的磁盘占用。 配置参数:

Broker压缩 大部分情况下 Broker 从 Producer 端接收到消息后仅仅是原封不动地保存而不会对其进行任何修改,但以下情况会引发Broker压缩

消费者端解压 Kafka 会将启用了哪种压缩算法封装进消息集合中,在Consummer中进行解压操作。

kafka提供以下特性来保证其消息的不丢失,从而保证消息的可靠性

生产者确认机制 当 Kafka 的若干个 Broker(根据配置策略,可以是一个,也可以是ALL) 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交。此时,这条消息在 Kafka 看来就正式变为“已提交”消息了。 设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的雹晌定义。如裤判果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。

生产者失败回调机制 生产者不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。producer.send(msg, callback) 采用异步的方式,当发生失败时会调用callback方法。

失败重试机制 设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries 0 的 Producer 能够自动重试消息发送,避免消息丢失。

消费者确认机制 确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。

副本机制 设置 replication.factor = 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。 设置 min.insync.replicas 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。 确保 replication.factor min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。

限定Broker选取Leader机制 设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。

由于kafka生产者确认机制、失败重试机制的存在,kafka的消息不会丢失但是存在由于网络延迟等原因造成重复发送的可能性。 所以我们要考虑消息幂等性的设计。 kafka提供了幂等性Producer的方式来保证消息幂等性。使用 ****的方式开启幂等性。

幂等性 Producer 的作用范围:

Kafka事务 事务型 Producer 能够保证将消息原子性地写入到多个分区中。这批消息要么全部写入成功,要么全部失败。另外,事务型 Producer 也不惧进程的重启。Producer 重启回来后,Kafka 依然保证它们发送消息的精确一次处理。 同样使用 的方式开启事务。

consumer group是kafka提供的可扩展且具有容错性的消费者机制。它是由一个或者多个消费者组成,它们共享同一个Group ID. 组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。

consummer group有以下的特性:

消费者位置 消费者位置,即位移。 消费者在消费的过程中需要记录自己消费了多少数据。 位移提交有自动、手动两种方式进行位移提交。

Kafka通过一个内置Topic(__consumer_offsets)来管理消费者位移。

rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配订阅topic的每个分区。

Kafka提供了一个角色:coordinator来执行对于consumer group的管理。 Group Coordinator是一个服务,每个Broker在启动的时候都会启动一个该服务。Group Coordinator的作用是用来存储Group的相关Meta信息,并将对应Partition的Offset信息记录到Kafka内置Topic(__consumer_offsets)中。

Rebalance 过程分为两步:Join 和 Sync。 Join 顾名思义就是加入组。这一步中,所有成员都向coordinator发送JoinGroup请求,请求加入消费组。一旦所有成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。

Sync,这一步leader开始分配消费方案,即哪个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案之后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的所有成员就都知道自己应该消费哪些分区了。

关于kafka事务和kafka事务是怎么实现的的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表