kafka最新版本(kafka最新offset)

本篇文章给大家谈谈kafka最新版本,以及kafka最新offset对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka压缩

首先说明一点kafka的压缩和kafka的compact是不同的,compact就是相同的斗好拦key只保留一条,是数据清理方式的一种和kafka的定期删除策略是并列的;而kafka的压缩是指数据不删除只是采用压缩算法进行压缩。

kafka从0.7版本就开始支持压缩功能:

1)kafka的发送端将消息按照批量(如果批量设置一条或者很小,可能有相反的效果)的方式进行压缩。

2)服务器空胡端直接将压缩消息保存(特别注意,如果kafka的版本不同,那么就存在broker需要先解压缩再压缩的问题,导致消耗资源过多)。

3)消费端自动解压缩,测试了下,发送端无论采用什么压缩模式,消费端无论设置什么解压模式,都可以自动完成解压缩功能。

4)压缩消息可以和非压缩消息混存,也就是说如果你kafka里面先保存的是非压缩消息,后面改成压缩,不用担心,kafka消费端自动支持。

测试的kafka版本:kafka_2.12-1.1.1

测试的kafka客户端版本袜芹:0.10.2.1

测试数据的条数:20000

kafka支持三种压缩算法,lz4、snappy、gzip,

通过上面数据来看,gzip的压缩效果最好,但是生成耗时更长,snappy和lz4的数据差不多,更倾向于lz4,具huxi大神的书上所说kafka里面对snappy做了硬编码,所以性能上最好的是lz4,推荐使用此压缩算法。

压缩率对比:

性能对比图:

很简单:

消费端无论设置什么压缩模式,都可以正确的解压kafka的消息,也就是说消费端可以不设置解压缩,

不过可能性能有所下降。

3分钟带你彻底搞懂 Kafka

Kafka到底是个啥?用来干嘛的?

官方定义如下:

翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!

实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。

这些中间件,最大的特点主要有两个:

在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。

但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。

随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。

采用这种模型,本质就是将要推送的数据,不在存放在当磨枯前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。

消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。

应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!

引入消息中间件之后,整个服务开发会变得更加简单,各负其责。

Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。

LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生 。

在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。

先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型 !

如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!

简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:

与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个**分区 Partition **的概念。

这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分培游哪配到 5 个分区中,其中每个分区存储 200 条数据。

这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!

这是 kafka 与其他的消息系统最大的不同!

和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。

那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:

与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。

这里我们需要重点了解一个名词: 消费组 !

考虑到多个消配码费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!

但是不同的组,可以消费同一个分区的数据!

你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。

但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。

如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。

因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!

光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。

kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。

zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk

下载zookeeper,并解压文件包

创建数据、日志目录

配置zookeeper

重新配置 dataDir 和 dataLogDir 的存储路径

最后,启动 Zookeeper 服务

到官网 下载想要的版本,我这里下载是最新稳定版 2.8.0 。

按需修改配置文件 server.properties (可选)

server.properties 文件内容如下:

其中有四个重要的参数:

可根据自己需求修改对应的配置!

启动 kafka 服务

创建一个名为 testTopic 的主题,它只包含一个分区,只有一个副本:

运行 list topic 命令,可以看到该主题。

输出内容:

Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

输入两条内容并回车:

Kafka 还有一个命令行使用者,它会将消息转储到标准输出。

输出结果如下:

本文主要围绕 kafka 的架构模型和安装环境做了一些初步的介绍,难免会有理解不对的地方,欢迎网友批评、吐槽。

由于篇幅原因,会在下期文章中详细介绍 java 环境下 kafka 应用场景!

Kafka系列之(4)——Kafka Producer流程解析

Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。

注:ProducerRecord允许用户在创建消息对象的时候就直接指定要发送的分区,这样producer后续发送该消息时可以直接发送到指定分区,而不用先通过Partitioner计算目标分区了。另外,我们还可以直接指定消息的时间戳——但一定要慎重使用这个功能,因为它有可能会令时间戳索引机制失效。

流程描述:

用户首先构建待发送的消息对象ProducerRecord,然后调用KafkaProducer#send方法进行发送。KafkaProducer接收到消息后首先对其进行序列化,然后结合本地缓存的元数据信息一起发送给partitioner去确定目标分区,最后追加写入到内存中的消息缓冲池(accumulator)。此时KafkaProducer#send方法成功返回。同时,KafkaProducer中还有一个专门的耐乎Sender IO线程负责将缓冲池中的消息分批次发送给对应的broker,完成真正的消息发送逻辑。

新版本的producer从设计上来说具有以下几个特点:

总共创建两个线程:执行KafkaPrducer#send逻辑的线程——我们称之为“用户主线程”;执行发送逻辑的IO线程——我们称之为“Sender线程”。

不同于Scala老版本的producer,新版本producer完全异步发送消息,并提供了回调机制(callback)供用户判断消息是否成功发送。

batching机制——“分批发送“机制。每个批次(batch)中包含了若干个PRODUCE请求,因此具有更高的吞吐量。

更加合理的默认分区策略:对于无key消息而言,Scala版本分区策略是一段时间内(默认是10分钟)将消息发往固定的目标分区,这容易造成消息分布的不均匀,而新版本的producer采用轮询的方式均匀地将消息分发到不同的分区。

底层统一使用基于Selector的网络客户端实现,结合Java提供的Future实昌毕悉现完整地提供了更加健壮和优雅的生命周期管理。

关键参数

batch.size 我把它列在了首位,因为该参数对于调优producer至关重要。之前提到过新版producer采用分批发送机制,该参数即控制一个batch的大小。默认是16KB

acks 关乎到消息持久数铅性(durability)的一个参数。高吞吐量和高持久性很多时候是相矛盾的,需要先明确我们的目标是什么? 高吞吐量?高持久性?亦或是中等?因此该参数也有对应的三个取值:0, -1和1

linger.ms 减少网络IO,节省带宽之用。原理就是把原本需要多次发送的小batch,通过引入延时的方式合并成大batch发送,减少了网络传输的压力,从而提升吞吐量。当然,也会引入延时

compression.type producer 所使用的压缩器,目前支持gzip, snappy和lz4。压缩是在用户主线程完成的,通常都需要花费大量的CPU时间,但对于减少网络IO来说确实利器。生产环境中可以结合压力测试进行适当配置

max.in.flight.requests.per.connection 关乎消息乱序的一个配置参数。它指定了Sender线程在单个Socket连接上能够发送未应答PRODUCE请求的最大请求数。适当增加此值通常会增大吞吐量,从而整体上提升producer的性能。不过笔者始终觉得其效果不如调节batch.size来得明显,所以请谨慎使用。另外如果开启了重试机制,配置该参数大于1可能造成消息发送的乱序(先发送A,然后发送B,但B却先行被broker接收)

retries 重试机制,对于瞬时失败的消息发送,开启重试后KafkaProducer会尝试再次发送消息。对于有强烈无消息丢失需求的用户来说,开启重试机制是必选项。

当用户调用KafkaProducer.send(ProducerRecord, Callback)时Kafka内部流程分析:

这是KafkaProducer#send逻辑的第一步,即为待发送消息进行序列化并计算目标分区,如下图所示:

如上图所示,一条所属topic是"test",消息体是"message"的消息被序列化之后结合KafkaProducer缓存的元数据(比如该topic分区数信息等)共同传给后面的Partitioner实现类进行目标分区的计算。

producer创建时会创建一个默认32MB(由buffer.memory参数指定)的accumulator缓冲区,专门保存待发送的消息。除了之前在“关键参数”段落中提到的linger.ms和batch.size等参数之外,该数据结构中还包含了一个特别重要的集合信息:消息批次信息(batches)。该集合本质上是一个HashMap,里面分别保存了每个topic分区下的batch队列,即前面说的批次是按照topic分区进行分组的。这样发往不同分区的消息保存在对应分区下的batch队列中。举个简单的例子,假设消息M1, M2被发送到test的0分区但属于不同的batch,M3分送到test的1分区,那么batches中包含的信息就是:{"test-0" - [batch1, batch2], "test-1" - [batch3]}。

单个topic分区下的batch队列中保存的是若干个消息批次。每个batch中最重要的3个组件包括:

compressor: 负责执行追加写入操作

batch缓冲区:由batch.size参数控制,消息被真正追加写入到的地方

thunks:保存消息回调逻辑的集合

这一步的目的就是将待发送的消息写入消息缓冲池中,具体流程如下图所示:

这一步执行完毕之后理论上讲KafkaProducer.send方法就执行完毕了,用户主线程所做的事情就是等待Sender线程发送消息并执行返回结果了。

此时,该Sender线程登场了。严格来说,Sender线程自KafkaProducer创建后就一直都在运行着 。它的工作流程基本上是这样的:

不断轮询缓冲区寻找 已做好发送准备的分区 ;

将轮询获得的各个batch按照目标分区所在的leader broker进行分组;

将分组后的batch通过底层创建的 Socket连接 发送给各个broker;

等待服务器端发送response回来。

为了说明上的方便,我还是基于图的方式来解释Sender线程的工作原理:

上图中Sender线程会发送PRODUCE请求给对应的broker,broker处理完毕之后发送对应的PRODUCE response。一旦Sender线程接收到response将依次(按照消息发送顺序)调用batch中的回调方法,如下图所示:

refer:

[img]

关于kafka最新版本和kafka最新offset的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表