kafkapartition(kafka partition)

本篇文章给大家谈谈kafkapartition,以及kafka partition对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

如何为Kafka集群选择合适的Partitions数量

如何决定kafka集群中topic,partition的数量,这是许多kafka用户经常遇到的问题。本文列举阐述几个重要的决定因素,以提供一些参考。 分区多吞吐量更高一个话题topic的各个分区partiton之间是并行的。在producer和broker方面,写不同的分区是完全并行的。因此一些昂贵的操作比如压缩,可以获得更多的资源,因为有多个进程。在consumer方面,一个分区的数据可以由一个consumer线程在拉去数据。分区多,并行的consumer(同一个消费组)也可以多。因此通常,分区越多吞吐量越高。基于吞吐量可以获得一个粗略的计算公式。先测量得到在只有一个分区的情况下,Producer的吞吐量(P)和Consumer的吞吐量(C)。那如果总的目标吞吐量是T的话,max(T/P,T/C)就是需要的最小分区数。在单分区的情况下,Producer的吞吐量可以通过一些配置参数,比如bath的大小、副本的铅伍数量、压缩格式、ack类型来测得。而Consumer的吞吐量通常取决于应用程序处理每一天消息逻辑。这些都是需要切合实际测量。随着时间推移数据量的增长可能会需要增加分区。有一点需要注意的是,Producer者发布消息通过key取哈希后映射分发到一个指定的分区,当分区数发生变化后,会带来key和分区映射关系发生变化。可能某些应用程序依赖key和分区映射关系陆配,映射关系变化了,程序就需要做相应的调整。为了避免这种key和分区关系带来的应用程序修改。所以在分区的时候尽量提前考虑,未来一年或两年的对分区数据量的要求。除了吞吐量,还有一些其他的因素,在定分区的数目时是值得考虑的。在某些情况下,太多的分区也可能会产生负面影响。分区多需要的打开的文件句柄也多每个分区都映射到broker上的一个目录,每个log片段都会有两个文件(一个是索引文件,另一个是实际的数据文件)。分区越多所需要的文件句柄也就越多,可以通过配置操作系统的参数增加打开文件句柄数。分区多增加了不可用风险kafka支持主备复制,具备更高的可用性和持久性。一个分区(partition)可以有多个副本,这些副本保存在不同的broker上。每个分区的副本中都会有一个作为Leader。当一个broker失败时,Leader在这台broker上的分区都会变得不可用,kafka会自动移除Leader,再其他副本中选一个作为新的Leader。Producer和Consumer都只会与Leader相连。一般情况下,当一个broker被正常关机时,controller主动地将Leader从正在关机的broker上移除。移动一个Leader只需要几毫秒。然当broker出现异常导致关机时,不可用会与分区数成正比。假设一个boker上有2000个分区,每个分区有2个副本,那这样一个boker大约有1000个Leader,当boker异常宕机,会同时有1000个分区变得不可用。假设恢复一个分区需要5ms,1000个分区就要5s。分区越多,在broker异常宕机的情况,恢复所需时间会越长,不可用风险会增加。分区多会增加点到点的延迟这个延迟需要体现在两个boker间主备数据同步。在默认情况下,两个boker只有一个线程负责数据的复制。根据经验,每个boker上的分区限制在100*b*r内(b指集群内boker的数量,r指副本数量)。 分区多会增加客户端的内存消耗kafka0.8.2后有个比较好的特色,新的Producer可以允许用户设置一个缓冲区,缓存一定量的数据。当缓冲区数据到达设定量或者到时间,数据会从缓存区删除发往broker。如果分区很多,每个分区都缓存一定量的数据量在缓冲区,很可能会占用大量的内存,甚至超过系统内存。Consumer也存在同样的问题,会从每个分区拉一批数据回来,分区越多,所需内存也就越大。根据经验,应该给每个分区分配至少几十KB的内存。 总结在通常情况下,增加分区可以提供kafka集群早激指的吞吐量。然而,也应该意识到集群的总分区数或是单台服务器上的分区数过多,会增加不可用及延迟的风险。

Kafka 源码解析之 Consumer 两种 commit 机制和 partition 分配机制

先看下两种不同的 commit 机制,一种是同步 commit,一种是异步 commit,既然其作用都是 offset commit,应该不难猜到它们底层使用接口都是一样的

同步 commit

同步 commit 的实现方式,client.poll() 方法会阻塞直到这个request 完成或超时才会返回。

异步 commit

而对于异步的 commit,最后调用的都是 doCommitOffsetsAsync 方法,其具体实现如下:

在异步 commit 中,可以添加相应的回调函数,如果 request 处理成功或处理失败,ConsumerCoordinator 会通过 invokeCompletedOffsetCommitCallbacks() 方法唤醒相应的回调函数。

关键区别在于future是否会get,同步提交就是future会get.

consumer 提供的两种不同 partition 分配策略,可以通过 partition.assignment.strategy 参数进行配置,默认情况下使用的是 org.apache.kafka.clients.consumer.RangeAssignor,Kafka 中提供另一种 partition 的分配策略 org.apache.kafka.clients.consumer.RoundRobinAssignor

用户可以自定义相应的 partition 分配机制,只需要继承这个 AbstractPartitionAssignor 抽象类即可。

AbstractPartitionAssignor

AbstractPartitionAssignor 有一个抽象方法,如下所示:

assign() 这个方法,有两个参数:

RangeAssignor 和 RoundRobinAssignor 通过这个方法 assign() 的实现,来进行相应的 partition 分配。

直接看一下这个方法的实现:

假设 topic 的 partition 数为 numPartitionsForTopic,group 中订阅这个 topic 的 member 数为 consumersForTopic.size(),首先需要算出两个值:

分配的陪巧规则是:对于剩下的那些 partition 分配到前 consumersWithExtraPartition 个 consumer 上,也就是前 consumersWithExtraPartition 个 consumer 获得 topic-partition 列表会比后面多一个。

在上述的程序中,举了一个例子,假设有一个 topic 有 7 个 partition,group 有5个 consumer,这个5个 consumer 都订阅这个 topic,那么 range 的分配方式如下:

而如果 group 中有 consumer 没有订阅这个 topic,那么这个 consumer 将不会参与分配。下面再举个例子,将有两个 topic,一个 partition 有5个,一个孝银 partition 有7个,group 有5个 consumer,但是只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:

这个是 roundrobin 的实现,其实现方法如下:

roundrobin 的实现原则,简单来说就是:列出所有 topic-partition 和列出所有的 consumer member,然后开始分配,一轮之后继续下一轮,假设有有一个 topic,它有7个 partition,group 有3个 consumer 都订阅了这个 topic,那么其分配方式为:

对于多个 topic 的订阅,将有两个 topic,一个 partition 有5个,一个 partition 有7个,group 有5个 consumer,但是芦慎键只有前3个订阅第一个 topic,而另一个 topic 是所有 consumer 都订阅了,那么其分配结果如下:

roundrobin 分配方式与 range 的分配方式还是略有不同。

Kafka partition的数量问题

kafka的每个topic都可以创建多个partition,partition的数量无上限,并不会像replica一样受限于broker的数量,因此partition的数量可以随心所欲的设置。那确定partition的数量就需要思考一些权衡因素。

越多的partition可以提供更高的吞吐量

在kafka中,单个partition是kafka并行操作的最小单元。每个partition可以独立接收推送的消息以及被consumer消费,相当于topic的一个子通道,partition和topic的关系就像高速公路的车道和高速公路的关系一样,起始点和终点相同,每个车道都可以独立实现运输,不同的是kafka中不存在车辆变道的说法,入口时选择的车道需要从一而终。而kafka的吞吐量显而易见,在资源足够的情况下,partition越多速度越快。

这里提到的资源充足解释一下,假设我现在一个partition的最大传输速度为p,目前kafka集群共有三个broker,每个broker的资源足够支撑三个partition最大速度传输,那我的集群最大传输速度为3*3*p=9p,假设在不增加资源的情况下将partition增加到18个,每个partition只能以p/2的速度传输数据,因此传输速度上限还是9p,并不能再提升,因此吞吐量的设计需要考虑broker的资源上限。当然,kafka跟其他集群一样,可以横向扩展,再增加三个相同资源的broker,那传输速度即可达到18p。

越多的分区需要打开更多的文件句柄

在kafka的broker中,每个分区都会对照着文件系统的一个目录。

在kafka的数据日志文件目录中,每个日志数据段都会分配两个文件,一个索引文件和一个数据文件。因此,随着partition的增多,需要的文件句柄数急剧增加,必要时需要调整操作系统允许打开的文件句柄数。

更多的分区会导致端对端的延迟

kafka端对端的延迟为producer端发布消息到consumer端消费消息所需的时间,即consumer接收消息的时间减去produce发布消息的时顷稿间。kafka在消息正确接收后才会暴露给消费者,即在保证in-sync副本复制成功之后才会暴露,瓶颈则来自于此。在一个broker上的副本从其他broker的leader上局数复制数据的时候只会开启一个线程,假设partition数量为n,每个副本同步的时间为1ms,那in-sync操作完成所需的时间即n*1ms,若n为10000,则需要10秒才能返回同步状态,数据才能暴露给消费者,这就导致了较大的端对端的延迟。

越多的partition意味着需要更多的内存

在新版本的kafka中可以支持批量提交和批量消费,而设置了批量提交和批量消费后,每个partition都会需要一定的内存空间。假设为100k,当partition为100时,producer端和consumer端都需要10M的内存;当partition为100000时,producer端和consumer端则都需要10G内存。无限的partition数量很快就会占据大量的内存,造成性能瓶颈。

越多的partition会导致更长时间的恢复期

kafka通过多副本复制技术,实现kafka的高可用性和稳定性。每个partition都会有多雀腊孝个副本存在于多个broker中,其中一个副本为leader,其余的为follower。当kafka集群其中一个broker出现故障时,在这个broker上的leader会需要在其他broker上重新选择一个副本启动为leader,这个过程由kafka controller来完成,主要是从Zookeeper读取和修改受影响partition的一些元数据信息。

通常情况下,当一个broker有计划的停机上,该broker上的partition leader会在broker停机前有次序的一一移走,假设移走一个需要1ms,10个partition leader则需要10ms,这影响很小,并且在移动其中一个leader的时候,其他九个leader是可用的,因此实际上每个partition leader的不可用时间为1ms。但是在宕机情况下,所有的10个partition

leader同时无法使用,需要依次移走,最长的leader则需要10ms的不可用时间窗口,平均不可用时间窗口为5.5ms,假设有10000个leader在此宕机的broker上,平均的不可用时间窗口则为5.5s。

更极端的情况是,当时的broker是kafka controller所在的节点,那需要等待新的kafka leader节点在投票中产生并启用,之后新启动的kafka leader还需要从zookeeper中读取每一个partition的元数据信息用于初始化数据。在这之前partition leader的迁移一直处于等待状态。

总结

通常情况下,越多的partition会带来越高的吞吐量,但是同时也会给broker节点带来相应的性能损耗和潜在风险,虽然这些影响很小,但不可忽略,因此需要根据自身broker节点的实际情况来设置partition的数量以及replica的数量。

[img]

关于kafkapartition和kafka partition的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表