kafkaack(kafkaacks)

本篇文章给大家谈谈kafkaack,以及kafkaacks对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

如何保证kafka生产者发送消息的可靠性

继一年前的 kafka介绍的学习总结 ,生产者Producers按照主题topic把消息发给kafka集群的主分区,其他分区从主分区同步该消息。

具体来看kafka的分布特性性:kafka消息的分区分布在Kafka集群的某些服务老丛孝器上,每个分区都有一个服务器充当leader,有0个或多个充当follower。leader处理分区的所有读请求和写请求,同时follower被动的从leader同步数据。假如leader异常了,其他follower会自动的选出一个新leader。每个服务器有可能充当某些分区的leader,同时也充当其他分区的follower,因此集群负载得到了很好的平衡和实现容错功能。

Kafka默认的副本因子是3,即每个分区只有1个leader副本和2个follower副本。

由于Kafka是一个分布式系统,follower必然会存在郑腔与leader不能实时同步的风险,那么follower副本在什么条件下才算与Leader同步?ISR同步副本机制解决这个侍稿问题。

In-sync replica(ISR)称之为同步副本,ISR中的副本都是与Leader进行同步的副本。ISR中是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中,而follower副本是否在ISR中,取决于该follower副本是否与leader副本保持了“同步”。

Kafka的broker服务端有一个参数replica.lag.time.max.ms, 该参数表示follower副本滞后与Leader副本的最长时间间隔,默认是10秒。只要follower副本落后于leader副本的时间间隔不超过10秒,就认为该follower副本与leader副本是同步的,即使follower副本落后于Leader副本几条消息,只要在10秒之内赶上Leader副本,就不会被踢出局。 如果follower副本被踢出ISR列表, 等到该副本追上了leader副本的进度,该副本会被再次加入到ISR列表中,所以ISR是一个动态列表,并不是静态不变的。

acks参数主要决定了kafka集群leader分区副本接收消息成功就响应成功还是fellower分区从leader同步成功才响应成功,这个参数对于消息是否丢失起着重要作用:

1) acks=0,生产者在成功写入消息之前不会等待任何来自服务器的响应。由于不需要等到服务器的响应,可以以网络支持的最大速度发送消息,从而达到很高的吞吐量。

2) acks=1,只要集群的leader分区副本接收到了消息,就会向生产者响应成功。一旦消息无法写入leader分区副本(比如网络原因、leader节点崩溃),生产者会收到一个错误响应,为了避免数据丢失,生产者会重新发送消息。这种方式的吞吐量取决于使用的是异步发送还是同步发送。

3) acks =all,只要ISR同步副本数大于等于最小同步副本数min.insync.replicas( 提醒:ISR是动态的 )收到消息时,生产者才会接收到服务器的成功响应。这种模式是最高级别的,也是最安全的,可以确保不止一个Broker接收到了消息,该模式的延迟会很高。

当acks=all时,只要ISR同步副本中有主备副本都同步了才会响应成功给生产者。其实这里面存在一个问题:ISR同步副本是动态的,有可能仅仅含有一个leader副本(相当于acks=1),也有可能的全部副本(这个也没必要,拜占庭将军场景只要保证一半以上的副本正常同步)。需要一个参数决定至少有几个副本需要同步成功才能响应成功给生产者。

为了解决这个问题,Kafka的Broker端提供了一个参数**min.insync.replicas**,该参数控制着至少被写入的副本数,该值默认值为1,生产环境中可以根据部署的是单节点还是多节点,多节点要能够满足拜占庭将军场景,我们以3节点场景为例。

3节点场景1: 当min.insync.replicas=2且acks=all时,如果ISR列表只有[1,2],3被踢出ISR列表,只要保证2个副本同步了,生产者就会收到成功响应。

3节点场景2 :当min.insync.replicas=2时,如果ISR列表只有[1,2],3被踢出ISR列表。当acks=all时,则响应失败(需要生产者重新发消息直到响应成功);当acks=0或者acks=1时成功写入数据。

ps:该场景下acks=all,kafka集群一旦同步失败就直接响应失败嘛?还是有超时时长?生产者已经将消息发送到leader分区,kafka(响应失败)对这个消息如何处理?入集群持久化嘛?后续重试发送的消息如何处理?

3节点场景3 :如果min.insync.replicas=2且acks=all,此时ISR列表为[1,2,3],只要2副本同步成功还是等到所有的副本都同步了,才会向生产者发送成功响应?因为min.insync.replicas=2只是一个最低限制,同步副本少于该配置值,则会抛异常,而acks=all,是需要保证所有的ISR列表的副本都同步了才可以发送成功响应。

1) 要想系统的可靠性,从来不是一方能决定的, kafka生产者发送消息的可靠性 主要由 kafka服务端 的动态同步副本列表ISR和最小同步副本数min.insync.replicas以及 生产者 参数ack=all。

2) kafka服务端的最小同步副本数min.insync.replicas由部署的集群和节点个数来决定,满足拜占庭将军场景(节点个数一半以上即可)。

3) 生产者的 副本个数 由部署的集群和节点个数来决定,满足拜占庭将军场景(节点个数一半以上即可)。如果是单副本的话,本文讨论的就没意义了。

4) kafka单节点场景,kafka服务端的动态同步副本列表ISR和最小同步副本数min.insync.replicas均为1,生产者的副本数为1(如果大于1估计会失败),ack=all和ack=1的效果一样。

5) kafka 3节点场景,kafka服务端的动态同步副本列表ISR为3个,最小同步副本数min.insync.replicas均为2,生产者的副本数为2和ack=all。

6) kafka 奇数n节点场景,kafka服务端的动态同步副本列表ISR为n个,最小同步副本数min.insync.replicas均为(n+1)/2,生产者的副本数为(n+1)/2和ack=all。

Kafka生产者ack机制剖析

kafka官网给出的kafka生产者配置参数

kafka防止消息重复消费

kafka重复消费的根本原乱或因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offset没更新?

max.poll.interval.ms

两次poll操作允许的最大时间间隔。单位毫秒。默认值300000(5分钟)。

两次poll超过此时间间隔,Kafka服仔兄务端会进行rebalance操作,导致客户端连接失效,无法提交offset信息,从而引发重复消费。

拿到消息就提交offset

1、丢包问题 :消息推送服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。

解决方案:首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。 

检测方法:使用重放机制,查看问题所在。

2. 重复消费最常见的原因 :re-balance问题,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-balance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。 

消息重复消费和消息丢包的解决办法

保证不丢失消息:生产者(ack=all 代表至少成功发送一次)     重试机制

消费者 (offset手动提交,业务逻辑成功处理后,提交offset) 

保哗戚伍证不重复消费:落表(主键或者唯一索引的方式,避免重复数据) 

业务逻辑处理(选择唯一主键存储到Redis或者mongdb中,先查询是否存在,若存在则不处理;若不存在,先插入Redis或Mongdb,再进行业务逻辑处理)

【大数据技术】kafka简介和底层实现

一、 K afka的三大组件:Producer、Server、Consumer

 

1、Kafka的 Producer 写入消息

producer采用push(推)模式将消息发布到broker,每条消息,都被追加到分区中(顺序写到磁盘,比随机写内存效率高)。

· 分区的作用:方便容量扩展,可以多并发读写数据,所以我们会指定多个分区进行数据存储。

· 一般根据 event_key的hash  % numPartitions来确定写入哪个分区,如果写入时没有指定key,则轮询写入每个分区;因此导致每个partition中消息是有序的,整体无序。

每条event数据写入partitionA中,并且只会写入partitionA_leader,当partitionA_leader写入完成后partitionA_flower节点再去partitionA_leader上异步拉取数据;默认ack为1,表示不会等待partitionA_flowers写入完成;如果设置ack为副本数或ack=-1,则等待副本全部写完,再写入下一条数据。

2、kafka的 broker—— 保存消息

1、 创建topic,并指定分区和副本数

2、每个分区(partition)有一个leader,多个follower,pull数据时先寻找leader,只会读leader上的数据,leader和follower不会在一个节点上,leader节点宕机后,其中一个follower变成leader

3、 消息数据存在每个分区中,默认配置每条消息保存7天 或 分区达到1GB 后删除数据

3、 K afka的 Consumer 消费数据:

1、consumer采用pull(拉)模式从broker中读取数据。

2、如果一个消费者来消费同一个topic下不同分区的数据,会读完一个分区再读下一个分区

生产者(producer)A PI 只有一套 ;   但是消费者(consumer)A PI 有两套(高级A PI 和低级A PI )

一、高级API:

Zookeeper管理offset(默认从最后一个开始读新数据,可以配置从开头读)

kafka server(kafka服务)管理分区、副本

二、低级API:

开发者自己控制offset,想从哪里读就从哪里读

// SimpleConsumer是Kafka用来读数据的类

// 通过send()方法获取元数据找到leader

TopicMetadataResponse metadataResponse = simpleConsumer.send(request);  //通过metadataResponse获取topic元数据,在获取topic中每个分区的元数据

// fetch 抓取数据

FetchResponse response = simpleConsumer.fetch(fetchRequest);

// 解析抓取到的数据

ByteBufferMessageSet messageAndOffsets = response.messageSet(topic, partition);

二、数据、broker状态,consumer状态的存储

一、在本地存储原始消息数据:

1、hash取模得分区、kafka中每条消息有一个Key,用来确定 每条数据存储到哪个分区中

2、轮询

3、自定义分区

二、在zookeeper存储kafka的元数据

三、存储consumer的offset数据

每个consumer有一个孝渣陆Key(broker+Topic+partition)的hash,再取模后 用来确定offset存到哪个系巧顷统文件中,Value是partitionMetaData。

1、使用zookeeper启动,zookeeper来存储offset

消费者梁手 消费消息时,offset(消费到的下标)会保存在consumer本地和zookeeper中(由本地上传到zookeeper中,所以本地会保存offset)

2、使用bootstrap启动,本地存储offset(在本地可以减少两节点交互),zookeeper存储其他数据

三、某 F lume对接Kafka案例

Kafka的ack机制

简述kafka的ack机制

Kafka的ack机制,指的是producer的消息发送确认机制,这直接影响到Kafka集群的吞吐量和消息可靠性。而吞吐量和可靠性就像硬币的两面,两者不可兼得,只能平衡。

ack有3个可选值,分别是1,0,-1。

ack=1,简单来说就是,producer只要收到一个分区副本成功写入的通知就认为推送消息成功了。这里有一个地方需要注意,这个副本必须是leader副本。只有leader副本成功写入了,producer才会认为消息发送成功。

注意,ack的默认值就是1。这个默认值其实就是吞吐量与可靠性的一个折中方案。生产上我们可以根据实际情况进行调整,比如如果你要追求高吞吐量,那么就要放弃可靠性。

ack=0,简单来说就是,producer发送一次就不再发送了,不管是否发送成功。

ack=-1,简单来说就是,producer只有收到分区内所有副本的成亮租功写入的通知才认为推送消息成功了。

接下来我们分析一下ack=1的情况下,为什么消息也会丢失?

ack=1的情况下,友键漏producer只要收到分区leader成功写入的通知就会认为消息发送成功好烂了。如果leader成功写入后,还没来得及把数据同步到follower节点就挂了,这时候消息就丢失了。

消息队列(三)kafka的一致性和失败处理策略

    如前篇所讲,假如对 登录系统,主流程是:用户请求--验证账户密码--返回成功;一些其他流程包括:积分、提醒、荣誉系统等;这里有个问题是:如果这两者有一个失败了怎么办?

    由于Message由Producer发送到Topic的Master Partition的时候,为了保证可靠性,会等待In-Sync的Replicas都同步完成之后才会返回成功;这里同样有个问题:如果返回超时怎么办?

   在Consumer的客户端,为了提高效率,会分为Fetch线程和Consume线程,在consume完成判链备之后,会提交offset;问题是:提交offset和consume仅有一个失败怎么办?

    这种语义,很难被接受,网络故障、主副本选举、超时等因素就可能造成这种现象,而且若没这方面的意识,日志可能都不会打印,人工补偿都没法做到,造成这种现象的掘毁原因有:

a) 生产者:(很多参数可以影响,挑几个重点的)

      异步调用,并没有设置回调函数;

      ack=0,不等Broker确认就继续发送消息;

      ack=1,等待leader确认后再发送消息,若follower没有跟上, 且leader挂掉,再选举将丢失

      retries=0,不重试,或者重试之后,仍旧失败,不考虑重试队列或者人工补偿

b) 消费者

    设置为自动提交,时间间隔设置的较短,且不手动提交offset;前文说到,Consumer分为Fetch和Consume两步,自动提交的offset是Fetch的,所以提交的最高offset的message还未处理;

    这种语义,被实现的较多,往往Consumer会拉到重复的message,再去在cache中做一层去重的处理,然后实现Exactly Once,至少在0.11之前未实现kafka事务之前;

a) 生产者 

   ack=ALL/-1,即等待In-sync的follower确认,才继续发送;

  且retries 1,且还是失败唤毁了之后,要有重试队列、死信队列、人工补偿的方案;

b)消费者  :关闭自动提交,并每次手动提交offset

   a) 在At least once 的基础上,在业务方做幂等或者去重,比如redis去重,或业务上幂等;

   b) 在At least once 的基础上,将offset和处理的结果组成一个原子事务,但这种方法,我个人觉得引入了更多的复杂性,参考

   c) 上面保证了不出错的情况下的,出错的情况下,需要重试队列、死信队列进行补偿;

   d) kafka的0.11版本之后,提供了事务,利用事务可以实现;

      kafka中没有实现重试队列和死信队列的功能,但是由于当前的message的offset如果不提交,就会阻塞后续的消费,所以需要预留失败的message补偿的机制;实现方法有几种:

       用本地队列去控制,设置在定时器中,给任务设置30s,5min,30min三次重试的机会,如果不行,持久化到DB中,进行人工干预,当然报警、日志都要跟上; 优点:逻辑简单、实现简单,缺点:需要一些机制保证本地cache的可靠性,比如加hook预防服务更新,但是这样仍不能完全解决,还要面临初始化重试队列、宕机来不及调用hook等问题;

    这个代价略大,而且这种不太受我们控制,很多消息队列的选型是已经固定的;修改kafka实现也不可能;

    这种思路也面临很大的问题:就是kafka没有实现延时的功能,那么新的message可能瞬时就被消费了,但是这个时候导致失败的原因,比如DB连接、网络问题没有解决,很快尝试几次之后,就进到死信队列了;

    还有一种是《深入kafka》一书中也讲了一些方法,但是都需要改动kafka的内部实现,这个不太适合小型项目;

   redis是很普遍的组件,而且对于少量失败、且对重试时间要求没那么严格的情况下,redis很适合,redis的持久化、哨兵之类的,对可靠性保证的还是很不错的;

留一篇文章,留着回忆redis的时候,再写吧

[img]

关于kafkaack和kafkaacks的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表