kafkaoffset(kafka offset)
本篇文章给大家谈谈kafkaoffset,以及kafka offset对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、kafka根据offset查找消息流程
- 2、Kafka的消息格式及offset是如何设置的
- 3、Kafka的Offset、Index(三)
- 4、kafka消费者和offset的关系,以及异常处理问题
- 5、Kafka提交offset机制
kafka根据offset查找消息流程
1,按照二分法找到小于1008的segment,也就是00000000000000001000.log和00000000000000001000.index
2,用目标offset减去文件名中的offset得到消息在这个segment中的偏移量。也就是1008-1000=8,偏移量是8。
3,再次用二分法在index文件中找瞎旁到对应的索引,也就是第三轿段行6,45。
4,到log文件中,从偏移量45的位置开始(实际上这里的消息offset是1006),顺序查找,直到找到offset为磨帆橡1008的消息。查找期间kafka是按照log的存储格式来判断一条消息是否结束的。
Kafka的消息格式及offset是如何设置的
Kafka的offset是如何设置的?
答:是生产者设置的,生产者在发送消息的时候,为每条消息生成一个唯一的offset。
Kafka消息的格式?
答:
Kafka最陆坦新版本的消息集叫做RecordBatch,而不帆辩是先前的MessageSet。RecordBatch内部存储了一条或多条消息。
RecordBatch的结构包含以下部分:
first offset,起始位移,占位8B
length,消息总长度,占位4B
partition leader epoch,分区leader纪元,可以看做分区leader的版本号或者更新次数,占位4B。
magic,消息格式的版本号,对于V2版态悉缺本而言,magic的值为2。
attributes,消息属性,占位2B,低三位表示压缩格式,第4位表示时间戳类型,第五位表示当前RecordBatch是否处于事务中第6位表示是否控制消息。
last offset delta,占位4B,RecordBatch中最后一个Record的offset与first offset的差值,主要被broker用来确保RecordBatch中Record组装的正确性。
first timestamp,占位8B,RecordBatch中第一条Record的时间戳。
max timestamp,占位8B,RecordBatch中最大的时间戳,一般情况下是最后一个Record的时间戳。和last offset delta功能一样,主要被broker用来确保RecordBatch中Record组装的正确性。
producer id,即PID,占位8B,用来支持幂等和事务。
producer epoch,占位2B,用来支持幂等和事务。
first sequence,占位4B,用来支持幂等和事务。
records count,占位4B,RecordBatch中Record的总数。
records,存放消息的容器。
Records的数据结构又是什么样的呢?Record包含以下属性:
length,消息总长度。
attributes,目前已弃用,但是还是会占用1B的空间,以备未来的格式扩展。
timestamp delta,时间戳增量,通常一个timestamp占用8B,这里时间戳增量保存的是当前时间戳与RecordBatch中first timestamp的差值,这样可以节省占用空间。
offset delta,位移增量,这个是当前消息的位移与RecordBatch中first offset的差值,这样可以节省占用空间。
key length,消息的key的长度。
key,消息key。
value length,消息的value的长度。
value,消息的值。
headers count,headers的总数。
headers,这个字段是用来支持应用级别的扩展,而不需要将一些应用级别的属性嵌入到消息体中。
Kafka的Offset、Index(三)
Kafka中的每个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中。Partition中的每个消息都有一个连续的序号,用于Partition唯一标识一条消息。
Offset记录着下一条将要发送给Consumer的消息的序号。
Offset从语义上来看拥有两种: Current Offset 和 Committed Offset 。
Current Offset保存在Consumer客户端中,它表示Consumer希望收到的下一条消息的序号。它仅仅在poll()方法中使用。如,Consumer第一次调用poll()方法后收到了20条消息,那么Current Offset就被设置为20。这样Consumer下一次调用poll()方法时,Kafka就知道应该从序号为21的消息开始读取。这样就能够保证每次Consumer poll消息时,都能够收到不重复的消息。
Committed Offset保存在Broker上(V0.9之后的版本),它表示Consumer已经确认消费过的消息的序号。主要通过commitSync()来操作。举例: Consumer通过poll() 方法收到20条消息后,此时Current Offset就是20,经过一系列的逻辑处理后,并没有调用commitSync()来提交Committed Offset,那么此时Committed Offset依旧是0。
Committed Offset主要用于Consumer Rebalance(再平衡)。在Consumer Rebalance的过程中,一个Partition被分配给了一个Consumer,那么这个Consumer该从什么位置开始消费消息呢?答案就是Committed Offset。另外,如果一个Consumer消费了5条消息(poll并且成功commitSync)之后宕机了,重新启动之后,它仍然能够从第6条消息开始消费,因为Committed Offset已经被Kafka记录为5。
小结一下 :
在Kafka V0.9前,Committed Offset信息保存在zookeeper的[consumers/{group}/offsets/{topic}/{partition}]目录中(zookeeper其实并不适合进行大批量的读写操作,尤其是写操作)。在V0.9之后,所有的offset信息都源戚保存在了Broker上的一个名为__consumer_offsets的topic(系统自维护的)中。
auto.offset.reset表示如果Kafka中没有存储对应的offset信息的话(有可能offset信息被删除),消费者从何处开始消费消息。有三个可选值:
分两个场景来说明:
a) Consumer消费了5条消息后宕机了,重启之后它读取到对应的Partition的Committed Offset为5,因此会直接从第6条消息开始读取。此时完全依赖于Committed Offset机制,和auto.offset.reset配置完全无关。
b) 新建了一个新的Group,并添加了一个Consumer,它订阅了一个已经存在的Topic。此时Kafka中还没有这个Consumer相应的Offset信息,因此此时Kafka就会根据auto.offset.reset配置来决定这个Consumer从何处开始消费消息。
在Kafka文件存储中,同一个topic下有多个不同的partition,每个partiton为一个目录,partition的名称规则为:topic名称+有序序号,第一个序号从0开始计,最大的序号为partition数量减1,partition是实际物理上的概念,而topic是逻辑上的概念。
如果就以partition为最小存储单位,我们可以想象当Kafka producer不断发送消息,必然会引起partition文件的无限扩张,这样对于消息文件的维护以及已经被消费的消息的清败裤理带来严重的影响,所以这里以segment为单位又将partition细分。每个partition(目录)相当于一个巨型文雹枯陵件被平均分配到多个大小相等的segment(段)数据文件中(每个segment 文件中消息数量不一定相等)这种特性也方便old segment的删除,即方便已被消费的消息的清理,提高磁盘的利用率。每个partition只需要支持顺序读写就行,segment的文件生命周期由服务端配置参数(log.segment.bytes,log.roll.{ms,hours}等若干参数)决定。
segment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment索引文件和数据文件。这两个文件的命令规则为:Partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值,数值大小为64位,20位数字字符长度,没有数字用0填充,如下:
以上面的segment文件为例,展示出segment:00000000000000170410的“.index”文件和“.log”文件的对应的关系,如下图:
如上图,“.index”索引文件存储大量的元数据,“.log”数据文件存储大量的消息,索引文件中的元数据指向对应数据文件中message的物理偏移地址。其中以“.index”索引文件中的元数据[3, 348]为例,在“.log”数据文件表示第3个消息,即在全局partition中表示170410+3=170413个消息,该消息的物理偏移地址为348。
那么如何从partition中通过offset查找message呢?以上图为例,读取offset=170418的消息,首先查找segment文件,其中00000000000000000000.index为最开始的文件,第二个文件为00000000000000170410.index(起始偏移为170410+1=170411),而第三个文件为00000000000000239430.index(起始偏移为239430+1=239431),所以这个offset=170418就落到了第二个文件之中。其他后续文件可以依次类推,以其实偏移量命名并排列这些文件,然后根据二分查找法就可以快速定位到具体文件位置。其次根据00000000000000170410.index文件中的[8,1325]定位到00000000000000170410.log文件中的1325的位置进行读取。
———————————————————
坐标帝都,白天上班族,晚上是知识的分享者
如果读完觉得有收获的话,欢迎点赞加关注
kafka消费者和offset的关系,以及异常处理问题
earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产前唤生的该分区下的数据
none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
简单来说,如果partition里已经有数据,但还没有消费,earliest就会从没消费的起始点来消费,反观latest就不会去消费;如果partition已经有已消费的数据,再放新的数据进去,那么它们都会从新的数据开始消费。
offset会保存在kafka内部,一开始发送数据到kafka的时候就有offset,只是有没有提交而已。而使用spring-kafka时,客户端在监听topic的时候,它有2种提交offset的方式:
1、自动提交,设置enable.auto.commit=true,更新的频率根据参数【auto.commit.interval.ms】来定。这种方式也被称为【at most once】,fetch到消息后就可以更新offset,无论是否消费成功。
2、手动提交,设置enable.auto.commit=false,这种方式称李悔郑为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;如果消费失败,则offset也不会更新,此条消息哪颂会被重复消费一次。
spring-kafka版本2.5.5,官网 ,设置的是批量消费。
因为是批量消费,所以@KafkaListener需要使用list来接收消息,如果使用单个bean会报错。正常不设置异常处理,它会不断循环重复消费这条数据,不像别的地方说有一定数量的重试。
实现接口new BatchErrorHandler自定义属于自己的批量异常处理,但只会到:
public void handle(Exception thrownException, ConsumerRecords?, ? data)
而不到
public void handle(Exception thrownException, ConsumerRecords?, ? data, Consumer?, ? consumer, MessageListenerContainer container)
再定义逻辑自定处理。如果像官网那样seek回开始的offset,也是无限循环,不太了解所以不采用。
实现接口ConsumerAwareListenerErrorHandler,注意区别是有个Listener的,
如果同时存在局部和全局,在@KafkaListener注解中标注了这个局部的异常处理器,会优先使用局部的。
如果发生异常,来的是一批数据,如果头部发生了异常,那么后面的都会略过。按照参考链接中的异常处理,定义一个死信,来接收这些失败的msg,如果异常处理在全局异常处理器中,那么它们都被发送到死信,后续就算数据是正确的,都不会处理,所以 还是建议个人使用try catch来包裹 处理,个人尝试在kafka处理业务远程插入两条数据,第一条错误,第二条正确,try catch中第一条自定发到死信,第二条会正确入库。
参考:
kafka之consumer参数auto.offset.reset
kafka 消费者offset记录位置和方式
消息队列-kafka消费异常问题
Kafka - 异常处理 待试
[img]Kafka提交offset机制
【转】 ;wd=eqid=e88a5ce70002861e000000035d4cdc76
在kafka的消费者中,有一个非常关键的机制,那就是offset机制。它使得Kafka在消费的过程中即使挂了或者引发再均衡问题重新分粗弯配Partation,当下次重新恢复消费时仍然可以知道从哪里开始消费。它好比看一本书中的书签标记,每次通过书签标记(offset)就能快速找到该从哪里开始缺卜看(消费)。
Kafka对于offset的处理有两种伏凳穗提交方式:(1) 自动提交(默认的提交方式) (2) 手动提交(可以灵活地控制offset)
关于kafkaoffset和kafka offset的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。