kafka查看topic列表(kafka topic列表)

本篇文章给大家谈谈kafka查看topic列表,以及kafka topic列表对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

查看Kafka某Topic的Partition详细信息时,使用如下哪个命令()

查看Kafka某Topic的Partition详细信神燃息时,使用如下哪个命令(渗瞎含)

A.bin/kafka-topics.sh--create

B.bin/丛笑kafka-topics.sh--delete

C.bin/kafka-topics.sh--describe(正确答案)

D.bin/kafka-topics.sh--1ist

Hadoop生态架构之kafka

1、定位:分布式的消息队列系统,同时提供数据分布式缓存态卜功能(默认7天)

2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)

3、Storm(分布式的实时计算框架)

Kafka目标成为队列平台

4、基本组件:

Broker:每一台机器是一个Broker

Producer:日志消息生产者,主要写数据

Consumer:日志消息消费者,主要读数据

Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写

Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层

5、Partition功能:负载均衡,需要保证消息的顺序性

顺序性拿烂的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的

如果有多个partiton存在,可能会出现顺序不一致帆敏穗的情况,原因:每个Partition相互独立

6、Topic:逻辑概念

一个或多个Partition组成一个Topic

7、Partition以文件夹的形式存在

8、Partition有两部分组成:

(1)index log:(存储索引信息,快速定位segment文件)

(2)message log:(真实数据的所在)

9、HDFS多副本的方式来完成数据高可用

如果设置一个Topic,假设这个Topic有5个Partition,3个replication

Kafka分配replication的算法:

假设:

将第i个Partition分配到(i % N)个Broker上

将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上

虽然Partition里面有多个replication

如果里面有M个replication,其中有一个是Leader,其他M-1个follower

10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)

11、物理上,不同的topic的消息肯定是分开存储的

12、偏移量——offset:用来定位数据读取的位置

13、kafka内部最基本的消息单位——message

14、传输最大消息message的size不能超过1M,可以通过配置来修改

15、Consumer Group

16、传输效率:zero-copy

0拷贝:减少Kernel和User模式上下文的切换

直接把disk上的data传输给socket,而不是通过应用程序来传输

17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)

减轻Kafka的实现难度

18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)

19、交付保证:

at least once:至少一次(会有重复、但不丢失)

at most once:最多发送一次(不重复、但可能丢失)

exactly once:只有一次(最理想),目前不支持,只能靠客户端维护

20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:

日志副本:保证可靠性

角色:主、从

ISR:是一个集合,只有在集合中的follower,才有机会被选为leader

如何让leader知道follower是否成功接收数据(心跳,ack)

如果心跳正常,代表节点活着

21、怎么算“活着”

(1)心跳

(2)如果follower能够紧随leader的更新,不至于被落的太远

如果一旦挂掉,从ISR集合把该节点删除掉

前提:需要把zookeeper提前启动好

一、单机版

1、启动进程:

]# ./bin/kafka-server-start.sh config/server.properties

2、查看topic列表:

]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181

3、创建topic:

]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test

4、查看topic描述:

]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test

5、producer发送数据:

]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test

6、consumer接收数据:

]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning

7、删除topic:

]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test

二、集群版

在slave1和slave2上的broker.id一定设置不同

分别在slave1和slave2上开启进程:

./bin/kafka-server-start.sh config/server.properties

创建topic:

]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test

1、实现一个consumer group

首先在不同的终端分别开启consumer,保证groupid一致

]# python consumer_kafka.py

执行一次producer:

]# python producer_kafka.py

2、指定partition发送数据

]# python producer_kafka_2.py

3、指定partition读出数据

]# python consumer_kafka_2.py

consumer_kafka.py:

producer_kafka.py:

consumer_kafka_2.py:

producer_kafka_2.py:

1.新建./conf/kafka_test/flume_kafka.conf

2.启动flume:

]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console

启动成功,如下图:

3.测试:

1.1flume监控产生的数据:

]# for i in seq 1 100 ; do echo '==== '$i 1.log ; done

1.2kafka消费数据:

]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning

消费结果如下图:

[img]

Kafka简介+Kafka Tool使用简介+使用实例

详细安装访问:

macOS 可以用homebrew快速安装,访问地址:

原文链接:

查看topic列表:

创建topic:

--create :创建命令;

--topic :后面指定topic名称;

--replication-factor :后面指定副本数;

--partitions :指定分区数,根据broker的数量决定;

--zookeeper :后面指定 zookeeper.connect 的zk链接

查看某个topic:

Kafka 作为消息雹岩烂系统的一种, 当然可 以像其他消 息中 间件一样作为消息数据中转的平台。 下面以 Java 语言为例,看一下如何使用 Kafka 来发送和接收消息。

1、引入依赖

2、消息生产者

示例 中用 KafkaProducer 类来创建一个消息生产者,该类的构造函数入参是一系列属性值。下面看一下这些属性具体都是什么含义。

bootstrap.servers 表示 Kafka 集群 。 如果集群中有多台物理服务器,则服务器地址之间用逗号分隔, 比如” 192.168.1.1 :9092,192.168.1.2:9092” 。 localhost 是笔者电脑的地址,9092 是 Kafka 服务器默认监听的端口号。

key.serializer 和 value.serializer 表示消息的序列化类型 。 Kafka 的消息是以键值对的形式发送到 Kafka 服务器的,在消息被发送到服务器之前,消息生产者需要把不同类型的 消息序列化为 二 进制类型,示例中是枣明发送文本消息到服务器 , 所以使用的是StringSerializer。

key.deserializer 和 value.deserializer 表示消息的反序列化类型。把来自 Kafka 集群的二进制消 息反序列 化 为指定 的 类型,因为序列化用的是String类型,所以用StringDeserializer 来反序列化。

zk.connect 用于指定 Kafka 连接 ZooKeeper 的 URL ,源漏提供了基于 ZooKeeper 的集群服务器自动感知功能, 可以动态从 ZooKeeper 中读取 Kafka 集群配置信息。

有 了 消息生产者之后 , 就可以调用 send 方法发送消息了。该方法的入参是 ProducerRecord类型对象 , ProducerRecord 类提供了多种构造函数形参,常见的有如下三种 :

ProducerRecord(topic,partition,key,value);

ProducerRecord(topic,key,value);

ProducerRecord(topic, value) ;

其中 topic 和 value 是必填的, partition 和 key 是可选的 。如果指定了 pa时tion,那么消息会被发送至指定的 partition ;如果没指定 partition 但指定了 Key,那么消息会按照 hash(key)发送至对应的 partition: 如果既没指定 partition 也没指定 key,那么 消息会按照 round-robin 模式发送(即以轮询的方式依次发送〉到每一个 partition。示例中将向 test-topic 主题发送三条消息。

3、消息消费者

和消息生产者类似,这里用 KafkaConsumer 类来创建一个消息消费者,该类的构造函数入参也是一系列属性值。

bootstrap. servers 和生产者一样,表示 Kafka 集群。

group.id 表示消费者的分组 ID。

enable.auto.commit 表示 Consumer 的 offset 是否自 动提交 。

auto.commit.interval .ms 用于设置自动提交 offset 到 ZooKeeper 的时间间隔,时间单位是毫秒。

key. deserializer 和 value.deserializer 表示用字符串来反序列化消息数据。

消息消费者使用 subscribe 方法 订阅了 Topic 为 test-topic 的消息。 Consumer 调用poll 方法来轮询 Kafka 集群的消息, 一直等到 Kafka 集群中没有消息或达到超时时间(示例中设置超时时间为 100 毫秒)为止 。 如果读取到消息,则打印出消息记录的 pa此ition, offset、key 等。

Kafka查看topic、consumer group状态命令

以下命令中使用的zookeeper配置地址为127.0.0.1:2181,bootstrap--server(即broker)地址为: 127.0.0.1:9292

1,查看kafka topic列表,使用--list参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

__consumer_offsets

lx_test_topic

test

2,查看kafka特定topic的详情,源租使用--topic与--describe参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe

Topic:lx_test_topic PartitionCount:1 ReplicationFactor:1 Configs:

Topic: lx_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

列出了lx_test_topic的parition数量、replica因子团知以及每个partition的leader、replica信息

3,查看consumer group列表,使用--list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保雹或兆存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list

lx_test

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

console-consumer-86845

console-consumer-11967

4,查看特定consumer group 详情,使用--group与--describe参数

同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group lx_test --describe

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER

lx_test lx_test_topic 0 465 465 0 kafka-python-1.3.1_/127.0.0.1

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-11967 --describe

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER

Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper.

console-consumer-11967 lx_test_topic 0 unknown 465 unknown console-consumer-11967_aws-lx-1513787888172-d3a91f05-0

其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)

上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。

Kafka(四)集群之kafka

在章节二( )中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。

这里我准备了三台虚拟机:

192.168.184.134

192.168.184.135

192.168.184.136

每台机器部署一个zk和kafka。

上一章节中zk集群已经神中部署完毕。

在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:

在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:

以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。

在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。游判山

下面这个是日志路径的配置

下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:

日志的保存时间:

以下是zookeeper的配置:

这里我们直接设置后台启动,三个节点都是如此:

这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:

解决这个问题只需要把/tmp/kafka-logs文件删除就好了。

看到日志出现这一句表明启动成功了:

下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:

我们在134节点创建一个topic:

查看topic列表:

在kafkatool中查看:

创建生产者:

创建消费者:

生成者发送冲游消息:

消费者接收消息:

到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。

关于kafka查看topic列表和kafka topic列表的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表