kafka命令(kafka命令行查看topic)

本篇文章给大家谈谈kafka命令,以及kafka命令行查看topic对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

Kafka | 常用命令行操作

本次实验的kafka集群有三个节点,即有三团枣个broker。

Topic操塌猜拆作的执行脚本在bin目录下的 kafka-topics.sh

例子中创建名为first的topic,2个分区,2个副本。

创建后可分别在三个节点兆凳下,进入kafka的存放数据的目录,查看topic的创建情况:first-0和first-1是以topic名和分区号组成,代表了2个分区;每个分区副本都是2个,而且相同分区副本是放在不同的节点上的,例如first-0的副本在node01和node03节点上,first-1的副本在node01和node02上。

查看数据目录下的topic是否被删除:发现topic名称后面多了delete删除标志,不要着急,等过会来看这个topic就会被删除。

每输入一条消息,消费者窗口会显示生产者发出的消息

[img]

Kafka总学不明白:

kafka的常用命令

新建一个topic

./bin/kafka-topics.sh --create --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181 --partition 3 --replication-factor 3 --topic test-0

test-0是topic的名字

replication-factor 3 副本数

topic存储的是元数据,通过它找到真实的数据

改变分区

./bin/kafka-topics.sh --alter --zookeeper 192.168.14.131:2181,192.168.14.131:2182,192.168.14.131:2183 --partition 5 --topic test03

分区信息查询

./bin/kafka-topics.sh --describe --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181 --topic test-0

结果为:leader主分区(宴坦broker Id) replicationfactor副本数 Isr是否存活(存储的顺序)

Topic:test-0 PartitionCount:3 ReplicationFactor:3 Configs:

Topic: test-0 Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1

Topic: test-0 Partition: 1 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2

Topic: test-0 Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0

查询语晌凯桐句:

bin/kafka-topics.sh --list --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181

删除语句

bin/kafka-topics.sh --delete --zookeeper hadoop04:2181,hadoop05:2181,hadoop06:2181 --topic test-0

结果是test-0 - marked for deletion

表示不能真正的删除只是做了个标记要删除取zookeeper去真正删除(这孙棚个配置如果设置为true可以真正删除,设置false不能删除)

#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除

delete.topic.enable=false

rmr /brokers/topics/test-0 ,删除client中的brokers中topic对应的

rmr /config/topics/test-0 ,删配置

rmr /admin/delete_topics,删除admin中的标记为删除的topics

分区设置后只可以增加不可以减少

六API操作

启动生产者

./bin/kafka-console-producer.sh --broker-list 192.168.14.131:9092,192.168.14.132:9092,192.168.14.133:9092 --topic test03

启动消费者

./bin/kafka-console-consumer.sh --zookeeper 192.168.247.131:2181,192.168.247.132:2181,192.168.247.133:2181 --topic test03 --from-beginning

kafka命令行的管理使用

首先要启动好kafka集群

1、集群时间同步

2、启动zookeeper集群

3、启动kafka集群

启动kafka集群的方式就是在集群中每台机器 kafka目录 下运行

nohup bin/kafka-server-start.sh config/server.properties /dev/null 21

kafka发出消息和接收消息都是基于topic,所以要先创建一个燃睁topic,才能向里面发消息。创建topic的脚本:

topic创建好了,就可以向里边发送消息了。

通过命令行实现数据的发送 producer 生产者

kafka-console-producer.sh 就是用来测圆森试用的脚本,可以模拟kafka消息的发送端。

直接运行 kafka-console-producer.sh 查看帮助

--broker-list 指定我们kafka集群的地址

--topic 指定我们的消息发送到哪个topic里面去

通过命令行实现皮腔岁数据的接收 consumer 消费者

--bootstrap-server 表示我们的kafak集群的地址,在旧版本中使用的是--zookeeper参数,两者至少使用一个

--from-beginning 表示我们从最开始的数据进行消费

--topic指定我们topic的名字

在producer端发送数据,在consumer端可以收到数据

kafka在win下运行出现“命令语法不正确”的解决方法

在kafka在win下运行出现“命令语法不正确”的解决方法

也就是    妖云小离   的教程补充!

关键点

需要配置,kafka和zookeeper的日志文件的目录,因为在配置文件server.properties(kafka的配置文件) 和  zookeeper-server.properties (zookeeper的配置文件磨谨)  需要设置日志文件目录,可以使用相对的路径。

server.properties

zookeeper-server.properties  这两个配置文件所在的地方。

分别设置日志的路径,支持相对路径。

server.properties

 zookeeper-server.properties 

设置完成之谨游袭后,再按 妖祥兄云小离  教程运行的效果:

会在原来的目录下创建一个winlogs的目录,这个目录运行的时候可以自己创建,可以不用预先创建,但是也可以预先创建。

3分钟带你彻底搞懂 Kafka

Kafka到底是个啥?用来干嘛的?

官方定义如下:

翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!

实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。

这些中间件,最大的特点主要有两个:

在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。

但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。

随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。

采用这种模型,本质就是将要推送的数据,不在存放在当磨枯前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。

消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。

应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!

引入消息中间件之后,整个服务开发会变得更加简单,各负其责。

Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。

LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生 。

在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。

先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型 !

如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!

简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:

与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个**分区 Partition **的概念。

这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分培游哪配到 5 个分区中,其中每个分区存储 200 条数据。

这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!

这是 kafka 与其他的消息系统最大的不同!

和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。

那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:

与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。

这里我们需要重点了解一个名词: 消费组 !

考虑到多个消配码费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!

但是不同的组,可以消费同一个分区的数据!

你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。

但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。

如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。

因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!

光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。

kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。

zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk

下载zookeeper,并解压文件包

创建数据、日志目录

配置zookeeper

重新配置 dataDir 和 dataLogDir 的存储路径

最后,启动 Zookeeper 服务

到官网 下载想要的版本,我这里下载是最新稳定版 2.8.0 。

按需修改配置文件 server.properties (可选)

server.properties 文件内容如下:

其中有四个重要的参数:

可根据自己需求修改对应的配置!

启动 kafka 服务

创建一个名为 testTopic 的主题,它只包含一个分区,只有一个副本:

运行 list topic 命令,可以看到该主题。

输出内容:

Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。

运行生产者,然后在控制台中键入一些消息以发送到服务器。

输入两条内容并回车:

Kafka 还有一个命令行使用者,它会将消息转储到标准输出。

输出结果如下:

本文主要围绕 kafka 的架构模型和安装环境做了一些初步的介绍,难免会有理解不对的地方,欢迎网友批评、吐槽。

由于篇幅原因,会在下期文章中详细介绍 java 环境下 kafka 应用场景!

Kafka查看topic、consumer group状态命令

以下命令中使用的zookeeper配置地址为127.0.0.1:2181,bootstrap--server(即broker)地址为: 127.0.0.1:9292

1,查看kafka topic列表,使用--list参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list

__consumer_offsets

lx_test_topic

test

2,查看kafka特定topic的详情,源租使用--topic与--describe参数

bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe

Topic:lx_test_topic PartitionCount:1 ReplicationFactor:1 Configs:

Topic: lx_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0

列出了lx_test_topic的parition数量、replica因子团知以及每个partition的leader、replica信息

3,查看consumer group列表,使用--list参数

查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保雹或兆存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list

lx_test

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list

console-consumer-86845

console-consumer-11967

4,查看特定consumer group 详情,使用--group与--describe参数

同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:

bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group lx_test --describe

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER

lx_test lx_test_topic 0 465 465 0 kafka-python-1.3.1_/127.0.0.1

bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-11967 --describe

GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER

Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper.

console-consumer-11967 lx_test_topic 0 unknown 465 unknown console-consumer-11967_aws-lx-1513787888172-d3a91f05-0

其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)

上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。

关于kafka命令和kafka命令行查看topic的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表