kafka创建topic(kafka创建topic指定group)
本篇文章给大家谈谈kafka创建topic,以及kafka创建topic指定group对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、怎么设置kafka topic数据存储时间
- 2、Kafka 源码解析之 Topic 的新建/扩容/删除
- 3、Kafka的Topic配置详解
- 4、windows 下远程连接kafka服务器并创建topic 部署服务
- 5、kafka极简入门(三)--创建topic
- 6、如何在kafka中创建topic
怎么设置kafka topic数据存储时间
1、Kafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 。
2.此命令将创建一个名为test的运竖topic,其中有三个分区,每个分区需要分配三个副本。
三。topic创建主要分为两部分:命令行controller逻辑部分。
四。后台逻辑将监听zookeeper下对应的目录节点。一旦启动topic创建命令,它将创建一个新的数据节点并触发后台创建逻辑。
五个。确定分区副本祥悄启分配方案(即,将每个分区副本分配给哪个代理);创建zookeeper节点并将此方案写入/brokers/topics/topic节点。
五个。确定分区副本分配方案(即每个分区的副本分配给哪个分区)broker上);创建zookeeper节点,把这个方案写入/brokers/topics/topic节点下。
6、Kafka controller这一部分的主要任务是:创建分区;创谨如建副本;为每个分区选择leaderISR;;更新各种缓存。
Kafka 源码解析之 Topic 的新建/扩容/删除
[TOC]
本篇接着讲述 Controller 的功能方面的内容,在 Kafka 中,一个 Topic 的新建、扩容或者删除都是由 Controller 来操作的,本篇文章也是主要聚焦在 Topic 的操作处理上(新建、扩容、删除),实际上 Topic 的创建在 Kafka 源码解析之 topic 创建过程(三) 中已经讲述过了,本篇与前面不同的是,本篇主要是从 Controller 角度来讲述,而且是把新建、扩容、删除这三个 Topic 级别的操作放在一起做一个总结。
这里把 Topic 新建与扩容放在一起讲解,主要是因为无论 Topic 是新建还是扩容,在 Kafka 内部其实都是 Partition 的新建,底层的实现机制是一样的,Topic 的新建与扩容的整体流程如下图所示:
Topic 新建与扩容触发条件的不同如下所示:
下面开始详细讲述这两种情况。
Topic 扩容
Kafka 提供了 Topic 扩容工具,假设一个 Topic(topic_test)只有一个 partition,这时候我们想把它扩容到两个 Partition,可以通过下面两个命令来实现:
这两种方法的区别是:第二种方法直接指定了要扩容的 Partition 2 的副本需要分配到哪台机器上,这样的话我们可以精确控制到哪些 Topic 放下哪些烂搜机器上。
无论是使用哪种方案,上面两条命令产生的结果只有一个,将 Topic 各个 Partition 的副本写入到 ZK 对应的节点上,这样的话 /brokers/topics/topic_test 节点的内容就会发生变化,PartitionModificationsListener 监听器就会被触发 ,该监听器的处理流程如下:
其 doHandleDataChange() 方法的处理流程如下:
下面我们看下 onNewPartitionCreation() 方法,其实现如下:
关于 Partition 的新建,总共分了以下四步:
经过上面几个阶段,一个 Partition 算是真正创建出来,可以正常进行读衡闭写工作了,当然上面只是讲述了 Controller 端做的内容,Partition 副本所在节点对 LeaderAndIsr 请求会做更多的工作,这部分会在后面关于 LeaderAndIsr 请求的处理中只饥拦历能够详细讲述。
Topic 新建
Kafka 也提供了 Topic 创建的工具,假设我们要创建一个名叫 topic_test,Partition 数为2的 Topic,创建的命令如下:
跟前面的类似,方法二是可以精确控制新建 Topic 每个 Partition 副本所在位置,Topic 创建的本质上是在 /brokers/topics 下新建一个节点信息,并将 Topic 的分区详情写入进去,当 /brokers/topics 有了新增的 Topic 节点后,会触发 TopicChangeListener 监听器,其实现如下:
只要 /brokers/topics 下子节点信息有变化(topic 新增或者删除),TopicChangeListener 都会被触发,其 doHandleChildChange() 方法的处理流程如下:
接着看下 onNewTopicCreation() 方法实现
上述方法主要做了两件事:
onNewPartitionCreation() 的实现在前面 Topic 扩容部分已经讲述过,这里不再重复,最好参考前面流程图来梳理 Topic 扩容和新建的整个过程。
Kafka Topic 删除这部分的逻辑是一个单独线程去做的,这个线程是在 Controller 启动时初始化和启动的。
TopicDeletionManager 初始化
TopicDeletionManager 启动实现如下所示:
TopicDeletionManager 启动时只是初始化了一个 DeleteTopicsThread 线程,并启动该线程。TopicDeletionManager 这个类从名字上去看,它是 Topic 删除的管理器,它是如何实现 Topic 删除管理呢,这里先看下该类的几个重要的成员变量:
前面一小节,简单介绍了 TopicDeletionManager、DeleteTopicsThread 的启动以及它们之间的关系,这里我们看下一个 Topic 被设置删除后,其处理的整理流程,简单做了一个小图,如下所示:
这里先简单讲述上面的流程,当一个 Topic 设置为删除后:
先看下 DeleteTopicsListener 的实现,如下:
其 doHandleChildChange() 的实现逻辑如下:
接下来,看下 Topic 删除线程 DeleteTopicsThread 的实现,如下所示:
doWork() 方法处理逻辑如下:
先看下 onTopicDeletion() 方法,这是 Topic 最开始删除时的实现,如下所示:
Topic 的删除的真正实现方法还是在 startReplicaDeletion() 方法中,Topic 删除时,会先调用 onPartitionDeletion() 方法删除所有的 Partition,然后在 Partition 删除时,执行 startReplicaDeletion() 方法删除该 Partition 的副本,该方法的实现如下:
该方法的执行逻辑如下:
在将副本状态从 OfflineReplica 转移成 ReplicaDeletionStarted 时,会设置一个回调方法 deleteTopicStopReplicaCallback(),该方法会将删除成功的 Replica 设置为 ReplicaDeletionSuccessful 状态,删除失败的 Replica 设置为 ReplicaDeletionIneligible 状态(需要根据 StopReplica 请求处理的过程,看下哪些情况下 Replica 会删除失败,这个会在后面讲解)。
下面看下这个方法 completeDeleteTopic(),当一个 Topic 的所有 Replica 都删除成功时,即其状态都在 ReplicaDeletionSuccessful 时,会调用这个方法,如下所示:
当一个 Topic 所有副本都删除后,会进行如下处理:
至此,一个 Topic 算是真正删除完成。
Kafka的Topic配置详解
配置topic级别参数时,相同(参数)属性topic级别会覆盖全局的,否则默认为全局配置属性值。
创建topic参数可以设置一个或多个--config "Property(属性)",下面是创建一个topic名称为"my-topic"例子,它设置了2个参数max message size 和 flush rate.
(A)创建topic时配置参数
(B)修改topic时配置参数
覆盖已经有topic参数,下面例子修改"my-topic"的max message属性
(C)删除猛培游topic级别配置参数
注:配置中散的kafka集群枝销的根目录为/config/mobile/mq/mafka02,因此所有节点信息都在此目录下。
cleanup.policy
delete.retention.ms
delete.retention.ms
flush.messages
flush.ms
index.interval.bytes
message.max.bytes
min.cleanable.dirty.ratio
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
log.roll.hours
参考资料:
[img]windows 下远程连接kafka服务器并创建topic 部署服务
一.打包项目镜像:
利用Dockerfile 来打包项目的镜像
本次项目共依赖两个镜像(一个基础系统环境和一个项目镜像)
本次直接将Dockerfile写好后,用shell脚本build.sh启动打包:
然后切换到项目的目录下找到build.sh,运行即可打包项目镜像
若
报错:"failed to dial gRPC: cannot connect to the Docker daemon. Is 'docker daemon' running on this host?: dial unix /var/run/docker.sock: connect: permission denied
"
就用
出现以下说明打包成功,接下来可以开始部署:
注意:如果遇到只读权限不能修改时,将host文件复制一份到桌面,修改后在替换原来的host文件
在hosts文件末尾加上kafka服务器 !外网! 39. 0.25...地址,修改后的格式如下:
1.1注意: 修改阿里云服务器的hosts 文件来配宽辩顷置 kafka的服务器地址:
在hosts 文件最后加入:
添加的 kafka-server 就是以下创建topic命令中的 kafka-server别名,
监听远程kafka:新建消费者:
远程创建topic的实例:
查看远程已创建的topc:
本地:
远程修改后的kafka topic:
2.通过git Bash 切换到kafka客户端的bin目录:
桌面打开 gitBash,切换到本地kafka软件目录:
这里一定要切换为windows
3.查看已经有的topic
--topic 指定topic名字
--replication-factor 指定副本数,因为我的灶念是集群环境,这里副本数就为3
--partitions 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好
注意:服务器部署时候一定要用内网172. .开头的,外部访问设为外网ip
不然会导致Kafka写入数据的时候报错 : TImeout
4.1本地docker创建topic:
4.2 本地windows 创建topic
进入本地软件路径KAFKA/BIN/WIONDOWS
创建topic
5.修改服务器的host:
一定要注意加sudo 不然会导致readonly 无法修改
在host 文件的末尾加上以下:
6.切换到工程部署的目录
7.清理redis,不然数据慎陆有残留:
7.1服务器上的redis挂载清除:
在 docker-compose.yml中注销这几行: 目的是每次启动不必记录上次没有执行完的数据.
这个是用来记录redis中假如上次指定的是1到100万块,没有执行完.下次接着执行没执行完的任务,测试时暂时关闭
7.2删除volume:
7.3 如果volume文件被占用时,先删除占用容器:
7.4 清除redis中的数据
进入redis容器中:
8.部署命令:
8.1开启docker可视化web上监控docker:
然后访问:
宿主机IP + 9000端口
8.2执行部署命令,启动服务:
9.部署时报错: yaml: line 46: did not find expected key
原因: docker-compose.yml文件中第46行 报错
解决:将所有数据对齐,不要有多余的空格.
kafka极简入门(三)--创建topic
回顾 kafka极简入门(二)--安装
topic是kafka的生产者和消费者最小交互的单位,我们先从topic入手,创建第一个topic.
或
所以执行上面命令将会创建一个名为mytest的topic,该topic下面有1个分区,并且该分区只有1个猛并副本。
PS:除了手动创建主题外,还可以将代理配置为在发布不存在的主题时自动创建主题
Partition:0 表示该分区的id为0
leader: 9 表示分区的首领副本所在的broker(本例子中broker.id配置为9,所以这里显示9,具体在config/server.properties配置。这里只有一个分区,所以首领分区也就是自己)
Replicas: 9 表示分区的跟随副本所在的broker
Isr: 9 表示分镇槐区的同步副本所在的broker(同步副本可以认御知友为跟首领副本准实时同步的副本,可以配置判断条件,后面会讲,首领副本挂掉后,服务器会从同步副本中选举新的首领)
发送三个消息,分别是hello, world和!
注意: --from-beginning 表示从最开始的offset处开始消费。如果不写表示从最新的offset处消费,那么先发送了消息再开启消费者是收不到已发送的信息的
如何在kafka中创建topic
[Toc]
在使用kafka发送消息和消费消息之前,必须先要创建topic,在kafka中创建topic的方式有以下3种清薯猜:
通过 kafka-topics.sh 脚本来创建一个名为 topic-test1 并且副本数为2、分区数为4的topic。(如无特殊说明,本文所述都是基于1.0.0版本。)
打开kafka-topics.sh脚本一探究竟,其内容只有一行,具体如下:
这个脚本的主要答型作用就是运行 kafka.admin.TopicCommand 。在main方法中判断参数列表中是否包含有 create ,如果有,那么就实施创建topic的任务。
创建topic时除了需要zookeeper的地址参数外,还需要指定topic的名称、副本因子replication-factor以及分区个数partitions等必选参数 ,还可以包括disable-rack-aware、手岩config、if-not-exists等可选参数。
创建topic的时候,如果名称中包含 . 或者 _ ,kafka会抛出警告。原因是:
在Kafka的内部做埋点时会根据topic的名称来命名metrics的名称,并且会将句点号 . 改成下划线 _ 。假设遇到一个topic的名称为 topic.1_2 ,还有一个topic的名称为 topic_1.2 ,那么最后的metrics的名称都为 topic_1_2 ,所以就会发生名称冲突。
topic的命名不推荐(虽然可以这样做)使用双下划线 __ 开头,因为以双下划线开头的topic一般看作是kafka的内部topic,比如 __consumer_offsets 和 __transaction_state 。
topic的名称必须满足如下规则:
关于kafka创建topic和kafka创建topic指定group的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。