kafka集群(kafka集群原理)
本篇文章给大家谈谈kafka集群,以及kafka集群原理对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、k8s部署Kafka集群
- 2、kafka集群扩容后的数据迁移
- 3、Kafka集群部署(Docker容器的方式)
- 4、Kafka(四)集群之kafka
- 5、kafka集群配置和使用
- 6、Kafka相关内容总结(Kafka集群搭建手记)
k8s部署Kafka集群
本次的目的是通过使用k8s搭建一个三节点的 kafka 集群,因为 kafka 集群需要用到存储,所以我们胡绝兄需要准备三个持久卷( Persistent Volume ) 简称就是PV。
首先通过裤袭nfs创建三个共享目录
分别对应三节点zk集群中的三个pod的持久化目录,创建好目录之后编写yaml创建 kafka-pv.yaml
使用如下命令创建kafka-pk
出现如下提示就代表创建成功
这是我们可宏胡以通过如下命令去查看创建成功的pv
我们选择使用 statefulset 去部署kafka集群的三节点,并且使用刚刚创建的pv作为存储设备。
kafka.yaml
使用 kubectl apply -f kafka.yaml 部署
可以通过 kubect get pods -n tool
可以查看到三个pod都是running状态了,我们再看service状态 可以通过 kubect get svc -n tool
可以看到我们将9092端口通过nodePort映射给了19092暴露出去了。
我们可以通过 kubectl exec -it kafka-1 -n tools /bin/bash 进入容器
创建topic成功 代表我们kafka集群部署成功!!!
[img]kafka集群扩容后的数据迁移
最近我们生产环境的kafka集群有增加节点的需求,然而kafka在新增节点后并不会像elasticsearch那样感知到新节点加入后自动将数据reblance到新集群中春败悔,因此这个过程需要我们手动分配。一番折腾之后,实现了增加kafka集群节点并将原有数据均匀分配到扩容后的集群。下面结合一个例子谈一下整个过程。
假定当前的cluster中只有(101,102,103)三个kafka节点,有一个名为think_tank的topic,该topic有2个replica,均匀分布在三个节点上.
我们要做的是在cluster中新增两个节点(记为104,105)后,将的数据均匀分到新集群中的5个节点上。
其实官方文档的这一小节关于集群扩容讲解很详细: Expanding your cluster ,整个过程需要分为三个步骤:获取kafka给出的建议分配方案、按照给出的分配方案执行分配、查看分配的进度以及状态。这三个步骤对应了kafka脚本提供的三个partition reassigment工具。
结合例子具体说明:
脚本的参数是以json文件的形式传入的,首先要新建一个json文件并设置需要分配哪些topic,think_tank-to-move.json:
使用/bin目录中提供的 kafka-reassign-partitions.sh 的脚本请求获取生成分配方案:
--broker-lsit 的参数 "101,102,103,104,105"是指集群中每个broker的id,由于我们是需要将所有topic均匀分配到扩完结点的5台机器上,所以要指定。同理,当业务改变为将原来的所有数据从旧节点(01,102,103)迁移到新节点(104,105)实现数据平滑迁移,这时的参数应"104,105".
脚本执行后返回的结果如下:
可以看出当前正在运行的方案中,think_tank的replica都是分布在101,102,103这3个节点,新给出的建议方案中replica均匀分布在扩容后的5个节点中。
将上一个步骤中生成的建议方案复制到新建的think_tank_reassignment.json中:
使用脚本执行:
脚本扒正执行,返回内容:
如上,成功开始执行分配数据,同时提示你如果有需要将之前的分配方案备份便于回滚到原方案。
查看脚本的方法如下,注意这次的json文件要和执行步骤中的json是同一个文件:
返回结果:
is still in progress表示还在处理中,全部迁移成功后每个partition都会显示 completed successfully.注意如果topic数据量大,这个过程可能会时间长一些, 不要轻易重启节点! 可能会导致数据不一致!!!
这个partion reassignment工具同样可以按需手动地将某个特定的topic指定到特定的broker上,所要做的就是按照步骤一给定的格式关联partition到borker即可,如,将think_tank的partition0指定到101、102两节点上:
另外,如果有增加replica的个数的需求,同样可以使用这个脚本,可以翻一下官网文档。
一点儿感触,在确定问题所在后,官方的文档枯族应该作为我们优先考虑的一个重要资料源,网上的资料由于时间较早、版本不同的原因,解决方式可能需要细微的改动才能达到目的,这些坑在官方的一手资料上其实是可以规避的。
欢迎拍砖,欢迎交流~
Kafka集群部署(Docker容器的方式)
文章主要介绍以docker容器的方式部署kafka集群。
上述配置文件中的server.x,数字x对应到data/myid文件中的值。三台机器x的值分别就是1,2,3。参数详细说明请参考 官网文档唤含 。
1.--net=host: 容器与主机共享同一Network Namespace,即容器与网络看到的是相同的网络视图(host模式存在一定的风险,对安全要求很高的生产环境最好不要用host模兆链森式,应考虑除此之外的其他几种模式)
2.-v: 指定主机到容器的目录映射关系
这样就以容器的方式启动了zookeeper的服务,可以通过 "docker exec -it zookeeper bash" 命令进入容器中进行一些操作,例如查看服务启动是否正常。也可以通过查看2181端口是否被监听判断zookeeper的服务是否运行
详细的参数配置说明请参考 官方文档 ,参数不仅可以通过上述文件的方式来配置,也可以通过容器环境变量的方式来配置,这里结合两种方式使用。
1.KAFKA_ADVERTISED_HOST_NAME、KAFKA_BROKER_ID的值要结合每台机器自身设置
2./etc/hosts文件中最好配置ip与hostname的映射关系,否则会报出如下错误" Error: Exception thrown by the agent : java.net.MalformedURLException: Local host name unknown: java.net.UnknownHostException: node0: node0: System error "
3.通过-e 指定的环境变量与在server.properties中配置的选项其效果是一样的
4.配置文件中的选项若要通过环境变量来指定,方式为:如broker.id对应KAFKA_BROKER_ID,类似的log.dirs对应KAFKA_LOG_DIRS
5.KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"指java堆内存大小的设置,6G大小是kafka官网给出的数值,此数值要结合机器的内存大小给出。超过6G的内存,可以设置为6G;若机器的内存低于6G而设置6G,则会报错。
5.启动成功后,可族亩以通过"docker logs kafka"命令查看日志
1.ZK_HOSTS:ZooKeeper访问地址(需指定机器的ip,localhost:2181或127.0.0.1:2181均会报 "java.net.ConnectException: Connection refused" 异常)
Kafka(四)集群之kafka
在章节二( )中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。
这里我准备了三台虚拟机:
192.168.184.134
192.168.184.135
192.168.184.136
每台机器部署一个zk和kafka。
上一章节中zk集群已经神中部署完毕。
在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:
在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:
以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。
在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。游判山
下面这个是日志路径的配置
下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:
日志的保存时间:
以下是zookeeper的配置:
这里我们直接设置后台启动,三个节点都是如此:
这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:
解决这个问题只需要把/tmp/kafka-logs文件删除就好了。
看到日志出现这一句表明启动成功了:
下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:
我们在134节点创建一个topic:
查看topic列表:
在kafkatool中查看:
创建生产者:
创建消费者:
生成者发送冲游消息:
消费者接收消息:
到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。
kafka集群配置和使用
进入安装目录,修改server.properties文件
修改如下属性,除id外,其他每台主机一致:
语义配置:(可选)
先启动zookeeper集群,已经在三台主机上配置好了zookeeper集群,启动:
在各台主机上进入zookeeper目录,分别启动zk:
在各台主机上进入kafka目录,分别启动kafka:
启动结好悔果为:
kafka占据了前台,辩枯要使携袜洞用主机,需要打开新终端
在新打开的终端上,进入zk目录,
进入kafka目录,创建主体
Kafka相关内容总结(Kafka集群搭建手记)
Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。
入门请参照:
在此不再赘述。
这部分不是本文的重点,但是kafka需要用到kafka集群,所以先搭建kafka集群。
从kafka官方文档看到,kafka似乎在未来的版本希望抛弃zookeep集群,自己维护集群的一致性,拭目以待吧。
我们搭建集群使用的是三台同机房的机团枯器,因为zookeeper不怎么占资源也不怎么占空间(我们的业务目前比较简单),所以三台机器上都搭建了zookeeper集群。
搭建zookeeper集群没什么难度,参考文档:
下面列一下我的配置并解析:
一共用三台物理机器,搭建一个Kafka集群。
每台服务器的硬盘划分都是一样的,每个独立的物理磁盘挂在一个单独的分区里面,这样很方便用于Kafka多个partition的数据读写与冗余。
/data1比较小,为了不成为集群的瓶颈,所以/data1用于存放kafka以及Zookeeper
每台机器的磁盘分布如下:
下面是kafka的简单配置,三台服务器都一样,如有不一致的在下文有说明。
kafka安装在目录/usr/local/kafka/下,下面的说明以10.1.xxx.57为例。
最重要的配置文件server.properties,需要配置的信息如下:
从上面的配置看到,kafka集群不需要像hadoop集群那样,配置ssh通讯,而且一个kafka服务器(官方文档称之为broker,下面统一使用这个称呼)并不知道其他的kafka服务器的存在,因此你需要逐个broker去启动kafka。各个broker根据自己的配置,会自动去配置文件上的zk服务器报到,这就是一个有zk服务器粘合起来的kafka集群。
我写了一个启动脚本,放在 /usr/local/kafka/bin 下面。启动脚本每个broker都一样:
如同kafka集群里面每一个broker都需要单独启动一样,蔽或携kafka集群里面每一个broker都需要单独关闭。
官方给出的关闭脚本是单独运行 bin/kafka-server-stop.sh
但是我运行的结果是无法关闭。打开脚本一看,才发现是最简单的办法,发一个TERM信号到kafka的java进程,官方脚本给出的grep有点问题。
发信号之后,一直tail着kafka日志,看到正常关闭。宏伏
指定zookeeper服务器,topic名称是LvsKafka(注意topic名称不能有英文句号(.)和下划线(_),否则会通不过,理由是名称会冲突,下文对此略有解析)
replication-factor指出重复因子是2,也就是每条数据有两个拷贝,可靠性考虑。
partitions 指出需要多少个partition,数据量大的多一点,无论生产和消费,这是负载均衡和高并发的需要。
可以看到刚才新建的24个partition,比如partition 5, 他的leader是broker 59,也就是10.1.xxx.59这台机器。
建立topic时我们指出需要2个拷贝,从上面的输出的Replicas字段看到,这两个拷贝放在59,58两个机器,也就是10.1.xxx.59和10.1.xxx.58.
Isr表示当前partition的所有拷贝所在的机器中,哪些是还活着(可以提供服务)的。现在是59和58都还存活。
这个命令另外还会看到一些类似于下面的内容:
__consumer_offsets到底是什么呢?其实就是客户端的消费进度,客户端会定时上报到kafka集群,而kafka集群会把每个客户端的消费进度放入一个自己内部的topic中,这个topic就是__consumer_offsets。我查看过__consumer_offsets的内容,其实就是每个客户端的消费进度作为一条消息,放入__consumer_offsets这个topic中。
这里给了我们两个提示:
1、kafka自己管理客户端的消费进度,而不是依靠zk,这就是kafka官方文档说的kafka未来会抛弃zk的底气之一;
2、留意到这个kafka自己的topic是带下划线的,也就是,kafka担心我们自己建的topic如果带下划线的话会跟这些内部自用的topic冲突;
关于kafka集群和kafka集群原理的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。