kafka自动创建topic(kafka自动创建主题)

本篇文章给大家谈谈kafka自动创建topic,以及kafka自动创建主题对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka 源码解析之 Topic 的新建/扩容/删除

[TOC]

本篇接着讲述 Controller 的功能方面的内容,在 Kafka 中,一个 Topic 的新建、扩容或者删除都是由 Controller 来操作的,本篇文章也是主要聚焦在 Topic 的操作处理上(新建、扩容、删除),实际上 Topic 的创建在 Kafka 源码解析之 topic 创建过程(三) 中已经讲述过了,本篇与前面不同的是,本篇主要是从 Controller 角度来讲述,而且是把新建、扩容、删除这三个 Topic 级别的操作放在一起做一个总结。

这里把 Topic 新建与扩容放在一起讲解,主要是因为无论 Topic 是新建还是扩容,在 Kafka 内部其实都是 Partition 的新建,底层的实现机制是一样的,Topic 的新建与扩容的整体流程如下图所示:

Topic 新建与扩容触发条件的不同如下所示:

下面开始详细讲述这两种情况。

Topic 扩容

Kafka 提供了 Topic 扩容工具,假设一个 Topic(topic_test)只有一个 partition,这时候我们想把它扩容到两个 Partition,可以通过下面两个命令来实现:

这两种方法的区别是:第二种方法直接指定了要扩容的 Partition 2 的副本需要分配到哪台机器上,这样的话我们可以精确控制到哪些 Topic 放下哪些烂搜机器上。

无论是使用哪种方案,上面两条命令产生的结果只有一个,将 Topic 各个 Partition 的副本写入到 ZK 对应的节点上,这样的话 /brokers/topics/topic_test 节点的内容就会发生变化,PartitionModificationsListener 监听器就会被触发 ,该监听器的处理流程如下:

其 doHandleDataChange() 方法的处理流程如下:

下面我们看下 onNewPartitionCreation() 方法,其实现如下:

关于 Partition 的新建,总共分了以下四步:

经过上面几个阶段,一个 Partition 算是真正创建出来,可以正常进行读衡闭写工作了,当然上面只是讲述了 Controller 端做的内容,Partition 副本所在节点对 LeaderAndIsr 请求会做更多的工作,这部分会在后面关于 LeaderAndIsr 请求的处理中只饥拦历能够详细讲述。

Topic 新建

Kafka 也提供了 Topic 创建的工具,假设我们要创建一个名叫 topic_test,Partition 数为2的 Topic,创建的命令如下:

跟前面的类似,方法二是可以精确控制新建 Topic 每个 Partition 副本所在位置,Topic 创建的本质上是在 /brokers/topics 下新建一个节点信息,并将 Topic 的分区详情写入进去,当 /brokers/topics 有了新增的 Topic 节点后,会触发 TopicChangeListener 监听器,其实现如下:

只要 /brokers/topics 下子节点信息有变化(topic 新增或者删除),TopicChangeListener 都会被触发,其 doHandleChildChange() 方法的处理流程如下:

接着看下 onNewTopicCreation() 方法实现

上述方法主要做了两件事:

onNewPartitionCreation() 的实现在前面 Topic 扩容部分已经讲述过,这里不再重复,最好参考前面流程图来梳理 Topic 扩容和新建的整个过程。

Kafka Topic 删除这部分的逻辑是一个单独线程去做的,这个线程是在 Controller 启动时初始化和启动的。

TopicDeletionManager 初始化

TopicDeletionManager 启动实现如下所示:

TopicDeletionManager 启动时只是初始化了一个 DeleteTopicsThread 线程,并启动该线程。TopicDeletionManager 这个类从名字上去看,它是 Topic 删除的管理器,它是如何实现 Topic 删除管理呢,这里先看下该类的几个重要的成员变量:

前面一小节,简单介绍了 TopicDeletionManager、DeleteTopicsThread 的启动以及它们之间的关系,这里我们看下一个 Topic 被设置删除后,其处理的整理流程,简单做了一个小图,如下所示:

这里先简单讲述上面的流程,当一个 Topic 设置为删除后:

先看下 DeleteTopicsListener 的实现,如下:

其 doHandleChildChange() 的实现逻辑如下:

接下来,看下 Topic 删除线程 DeleteTopicsThread 的实现,如下所示:

doWork() 方法处理逻辑如下:

先看下 onTopicDeletion() 方法,这是 Topic 最开始删除时的实现,如下所示:

Topic 的删除的真正实现方法还是在 startReplicaDeletion() 方法中,Topic 删除时,会先调用 onPartitionDeletion() 方法删除所有的 Partition,然后在 Partition 删除时,执行 startReplicaDeletion() 方法删除该 Partition 的副本,该方法的实现如下:

该方法的执行逻辑如下:

在将副本状态从 OfflineReplica 转移成 ReplicaDeletionStarted 时,会设置一个回调方法 deleteTopicStopReplicaCallback(),该方法会将删除成功的 Replica 设置为 ReplicaDeletionSuccessful 状态,删除失败的 Replica 设置为 ReplicaDeletionIneligible 状态(需要根据 StopReplica 请求处理的过程,看下哪些情况下 Replica 会删除失败,这个会在后面讲解)。

下面看下这个方法 completeDeleteTopic(),当一个 Topic 的所有 Replica 都删除成功时,即其状态都在 ReplicaDeletionSuccessful 时,会调用这个方法,如下所示:

当一个 Topic 所有副本都删除后,会进行如下处理:

至此,一个 Topic 算是真正删除完成。

kafka极简入门(三)--创建topic

回顾 kafka极简入门(二)--安装

topic是kafka的生产者和消费者最小交互的单位,我们先从topic入手,创建第一个topic.

所以执行上面命令将会创建一个名为mytest的topic,该topic下面有1个分区,并且该分区只有1个猛并副本。

PS:除了手动创建主题外,还可以将代理配置为在发布不存在的主题时自动创建主题

Partition:0 表示该分区的id为0

leader: 9 表示分区的首领副本所在的broker(本例子中broker.id配置为9,所以这里显示9,具体在config/server.properties配置。这里只有一个分区,所以首领分区也就是自己)

Replicas: 9 表示分区的跟随副本所在的broker

Isr: 9 表示分镇槐区的同步副本所在的broker(同步副本可以认御知友为跟首领副本准实时同步的副本,可以配置判断条件,后面会讲,首领副本挂掉后,服务器会从同步副本中选举新的首领)

发送三个消息,分别是hello, world和!

注意: --from-beginning 表示从最开始的offset处开始消费。如果不写表示从最新的offset处消费,那么先发送了消息再开启消费者是收不到已发送的信息的

[img]

springboot往kafka写数据,如果事先topic未创建,后续创建会影响写入吗

不会。springboot往kafka写数颂铅据,调用kafkaTemplate的send,发去的topic如果没有,会自动创建,默认樱芹是一个partition、野颂好一个replicas的。不会影响后续的写入。

windows 下远程连接kafka服务器并创建topic 部署服务

一.打包项目镜像:

利用Dockerfile 来打包项目的镜像

本次项目共依赖两个镜像(一个基础系统环境和一个项目镜像)

本次直接将Dockerfile写好后,用shell脚本build.sh启动打包:

然后切换到项目的目录下找到build.sh,运行即可打包项目镜像

报错:"failed to dial gRPC: cannot connect to the Docker daemon. Is 'docker daemon' running on this host?: dial unix /var/run/docker.sock: connect: permission denied

"

就用

出现以下说明打包成功,接下来可以开始部署:

注意:如果遇到只读权限不能修改时,将host文件复制一份到桌面,修改后在替换原来的host文件

在hosts文件末尾加上kafka服务器 !外网! 39. 0.25...地址,修改后的格式如下:

1.1注意: 修改阿里云服务器的hosts 文件来配宽辩顷置 kafka的服务器地址:

在hosts 文件最后加入:

添加的 kafka-server 就是以下创建topic命令中的 kafka-server别名,

监听远程kafka:新建消费者:

远程创建topic的实例:

查看远程已创建的topc:

本地:

远程修改后的kafka topic:

2.通过git Bash 切换到kafka客户端的bin目录:

桌面打开 gitBash,切换到本地kafka软件目录:

这里一定要切换为windows

3.查看已经有的topic

--topic 指定topic名字

--replication-factor 指定副本数,因为我的灶念是集群环境,这里副本数就为3

--partitions 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好

注意:服务器部署时候一定要用内网172. .开头的,外部访问设为外网ip

不然会导致Kafka写入数据的时候报错 : TImeout

4.1本地docker创建topic:

4.2 本地windows 创建topic

进入本地软件路径KAFKA/BIN/WIONDOWS

创建topic

5.修改服务器的host:

一定要注意加sudo 不然会导致readonly 无法修改

在host 文件的末尾加上以下:

6.切换到工程部署的目录

7.清理redis,不然数据慎陆有残留:

7.1服务器上的redis挂载清除:

在 docker-compose.yml中注销这几行: 目的是每次启动不必记录上次没有执行完的数据.

这个是用来记录redis中假如上次指定的是1到100万块,没有执行完.下次接着执行没执行完的任务,测试时暂时关闭

7.2删除volume:

7.3 如果volume文件被占用时,先删除占用容器:

7.4 清除redis中的数据

进入redis容器中:

8.部署命令:

8.1开启docker可视化web上监控docker:

然后访问:

宿主机IP + 9000端口

8.2执行部署命令,启动服务:

9.部署时报错: yaml: line 46: did not find expected key

原因: docker-compose.yml文件中第46行 报错

解决:将所有数据对齐,不要有多余的空格.

关于kafka自动创建topic和kafka自动创建主题的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表