kafka查看topic列表(kafka topic列表)
本篇文章给大家谈谈kafka查看topic列表,以及kafka topic列表对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()
- 2、Hadoop生态架构之kafka
- 3、Kafka简介+Kafka Tool使用简介+使用实例
- 4、Kafka查看topic、consumer group状态命令
- 5、Kafka(四)集群之kafka
查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()
查看Kafka某Topic的Partition详细信神燃息时,使用如下哪个命令(渗瞎含)
A.bin/kafka-topics.sh--create
B.bin/丛笑kafka-topics.sh--delete
C.bin/kafka-topics.sh--describe(正确答案)
D.bin/kafka-topics.sh--1ist
Hadoop生态架构之kafka
1、定位:分布式的消息队列系统,同时提供数据分布式缓存态卜功能(默认7天)
2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
3、Storm(分布式的实时计算框架)
Kafka目标成为队列平台
4、基本组件:
Broker:每一台机器是一个Broker
Producer:日志消息生产者,主要写数据
Consumer:日志消息消费者,主要读数据
Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写
Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
5、Partition功能:负载均衡,需要保证消息的顺序性
顺序性拿烂的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
如果有多个partiton存在,可能会出现顺序不一致帆敏穗的情况,原因:每个Partition相互独立
6、Topic:逻辑概念
一个或多个Partition组成一个Topic
7、Partition以文件夹的形式存在
8、Partition有两部分组成:
(1)index log:(存储索引信息,快速定位segment文件)
(2)message log:(真实数据的所在)
9、HDFS多副本的方式来完成数据高可用
如果设置一个Topic,假设这个Topic有5个Partition,3个replication
Kafka分配replication的算法:
假设:
将第i个Partition分配到(i % N)个Broker上
将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
虽然Partition里面有多个replication
如果里面有M个replication,其中有一个是Leader,其他M-1个follower
10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分开存储的
12、偏移量——offset:用来定位数据读取的位置
13、kafka内部最基本的消息单位——message
14、传输最大消息message的size不能超过1M,可以通过配置来修改
15、Consumer Group
16、传输效率:zero-copy
0拷贝:减少Kernel和User模式上下文的切换
直接把disk上的data传输给socket,而不是通过应用程序来传输
17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
减轻Kafka的实现难度
18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
19、交付保证:
at least once:至少一次(会有重复、但不丢失)
at most once:最多发送一次(不重复、但可能丢失)
exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
日志副本:保证可靠性
角色:主、从
ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
如何让leader知道follower是否成功接收数据(心跳,ack)
如果心跳正常,代表节点活着
21、怎么算“活着”
(1)心跳
(2)如果follower能够紧随leader的更新,不至于被落的太远
如果一旦挂掉,从ISR集合把该节点删除掉
前提:需要把zookeeper提前启动好
一、单机版
1、启动进程:
]# ./bin/kafka-server-start.sh config/server.properties
2、查看topic列表:
]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3、创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
5、producer发送数据:
]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test
6、consumer接收数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、删除topic:
]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的broker.id一定设置不同
分别在slave1和slave2上开启进程:
./bin/kafka-server-start.sh config/server.properties
创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test
1、实现一个consumer group
首先在不同的终端分别开启consumer,保证groupid一致
]# python consumer_kafka.py
执行一次producer:
]# python producer_kafka.py
2、指定partition发送数据
]# python producer_kafka_2.py
3、指定partition读出数据
]# python consumer_kafka_2.py
consumer_kafka.py:
producer_kafka.py:
consumer_kafka_2.py:
producer_kafka_2.py:
1.新建./conf/kafka_test/flume_kafka.conf
2.启动flume:
]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动成功,如下图:
3.测试:
1.1flume监控产生的数据:
]# for i in seq 1 100 ; do echo '==== '$i 1.log ; done
1.2kafka消费数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消费结果如下图:
[img]Kafka简介+Kafka Tool使用简介+使用实例
详细安装访问:
macOS 可以用homebrew快速安装,访问地址:
原文链接:
查看topic列表:
创建topic:
--create :创建命令;
--topic :后面指定topic名称;
--replication-factor :后面指定副本数;
--partitions :指定分区数,根据broker的数量决定;
--zookeeper :后面指定 zookeeper.connect 的zk链接
查看某个topic:
Kafka 作为消息雹岩烂系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。
1、引入依赖
2、消息生产者
示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。
bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。
key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是枣明发送文本消息到服务器 , 所以使用的是StringSerializer。
key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。
zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,源漏提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。
有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :
ProducerRecord(topic,partition,key,value);
ProducerRecord(topic,key,value);
ProducerRecord(topic, value) ;
其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。
3、消息消费者
和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。
bootstrap. servers 和生产者一样,表示 Kafka 集群。
group.id 表示消费者的分组 ID。
enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。
auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。
key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。
消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。
Kafka查看topic、consumer group状态命令
以下命令中使用的zookeeper配置地址为127.0.0.1:2181,bootstrap--server(即broker)地址为: 127.0.0.1:9292
1,查看kafka topic列表,使用--list参数
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
__consumer_offsets
lx_test_topic
test
2,查看kafka特定topic的详情,源租使用--topic与--describe参数
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe
Topic:lx_test_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: lx_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
列出了lx_test_topic的parition数量、replica因子团知以及每个partition的leader、replica信息
3,查看consumer group列表,使用--list参数
查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保雹或兆存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list
lx_test
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
console-consumer-86845
console-consumer-11967
4,查看特定consumer group 详情,使用--group与--describe参数
同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group lx_test --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
lx_test lx_test_topic 0 465 465 0 kafka-python-1.3.1_/127.0.0.1
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-11967 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper.
console-consumer-11967 lx_test_topic 0 unknown 465 unknown console-consumer-11967_aws-lx-1513787888172-d3a91f05-0
其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)
上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。
Kafka(四)集群之kafka
在章节二( )中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。
这里我准备了三台虚拟机:
192.168.184.134
192.168.184.135
192.168.184.136
每台机器部署一个zk和kafka。
上一章节中zk集群已经神中部署完毕。
在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:
在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:
以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。
在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。游判山
下面这个是日志路径的配置
下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:
日志的保存时间:
以下是zookeeper的配置:
这里我们直接设置后台启动,三个节点都是如此:
这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:
解决这个问题只需要把/tmp/kafka-logs文件删除就好了。
看到日志出现这一句表明启动成功了:
下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:
我们在134节点创建一个topic:
查看topic列表:
在kafkatool中查看:
创建生产者:
创建消费者:
生成者发送冲游消息:
消费者接收消息:
到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。
关于kafka查看topic列表和kafka topic列表的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。