kafka常用命令(kafka常用命令和解释)

本篇文章给大家谈谈kafka常用命令,以及kafka常用命令和解释对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

kafka ACL常用权限操作

kafka ACL常用权限操作

使用bin/kafka-topics.sh创建

注意工具bin/kafka-topics.sh访问的是zookeeper而不是kafka,即他是一个zookeeper client而不是一个kafka client,所以它的认证都是通过zookeeper完成的。

Case 1:如果zookeeper没有配置ACL激活:

Case 2:如果zookeeper已经配置ACL激活:

命令还是前面的那个命令,但是必须提供java.security.auth.login.config指向jaas.conf文件。例如:

命令的配置可以直接修改jvm的启动脚本,或者设置在环境变量里:

这里配置的用户必须是zookeeper服务端已经配置运行可以访问的客户端用户。例如,下面的zookeeper服务端配置:

运团缓行客户端为admin的用户作为zookeeper客户端连接访问。

查询topic操作的ACL认证,同前面创建topic操作的认证一样,不细说,参考前面。

删除topic操作的ACL认证,同前面创建topic操作的认证一样,不细说,参考前面。

producer用的脚本是/opt/kafka/bin/kafka-console-producer.sh,注意这个producer脚本是和kafka打交道的(相对bin/kafka-topics.sh是和zookeeper打交道的),所以:

命令行格式:

文件/path/to/client-sasl.properties

还需要配置client用户信息,并传给JVM参数:

此时如果没有授权,则会得到如下错误信息:

赋予producer的权限:

这个选项--producer实际上在Topic域上创建了(Write/Describe/Create)3个子权限:

当然用户也可以单独创建者三个子权限。

consumer用的脚本是/opt/kafka/bin/kafka-console-consumer.sh,注意和生产者producer一样,consumer也是和kafka打交道的(相对于bin/kafka-topics.sh是和zookeeper打交道的),所以:

命令行格式:

选项--from-begining可以调整成其他值;配置文件/path/to/client-sasl.properties和producer的一样,不细说,参考生产者。

此时如果没有授权,则会得到如下错误信息:

赋予consumer的权限:

和producer相比,consumer还有一个额外的参数--group,如果没有限制,则置成'*'即可;这个--consumer的选择实际上在Topic域上创建了(Read/Describe)2个子权限,然后在Group域创建了(Read)1个子权限:

这个地方我们注意一下,consumer没有Create的权限,所以如果kafka配置成auto.create.topics.enable=true,而此梁前时topic不存在,那么consumer试图创建topic的时橡或清候会失败,那就需要一条单独的Create授权规则来给consumer增加Create权限。

权限管理工具以命令行的方式管理权限,可以增加/删除/列举所有的权限规则。

基本用法:

授权用户kafaclient具有Read topic kafaclient--topic的权限。

删除用户kafaclient具有Describe topic kafaclient--topic的权限。

查看当前在topic kafkaclient--topic上面的权限列表。

另外注意,和kafka-topics.sh一样,kafka-acls.sh也是直接访问zookeeper的,而不是访问kafka,所以它的认证方式和kafka-topics.sh是一样的:

详细的用法配置请参考kafka-topics.sh部分,不细说。

在kafka2.0之后引入了--resource-pattern-type这个参数,可以针对特定的资源(topic)命名规则,例如前缀,来为某一类的topic添加规则。而之前的办法只能读完整的topic设置规则,字符' '表示所有的,这不是规则表达式匹配任意字符的意思,而就是文本字符' '。

例如:

授权用户kafkaclient具有访问所有以'kafkaclient--'开头的topic的权限;这样带来的好处是,以后我们使得kafkaclient创建的topic全部以'kafkaclient--'开头,那么就不需要再为这些topic创建rule,一条rule就能够动态的管理新加的topic。

其实我对这个还是不满意,如果能够定义灵活规则就好了;因为上面的限制,我还是需要为每一个用户添加一条规则,而我想为所有的用户只用一条规则,这条规则就是:任何用户具有访问以这个用户名开头的所有的topic;这样不管以后新加topic还是新加用户,都不用再新加rule了。遗憾的是目前kafka还是不支持这个功能。类似:

[img]

Kafka常用命令行总结

以下是kafka常用命令行总结:

1.查看topic的详细信息

./kafka-topics.sh -zookeeper127.0.0.1:2181-describe -topic testKJ1

2、为topic增加副本

./kafka-reassign-partitions.sh -zookeeper127.0.0.1:2181-reassignment-json-file json/partitions-to-move.json -execute

3、创建topic

./kafka-topics.sh --create --zookeeper localhost:2181--replication-factor1--partitions1--topic testKJ1

4、为topic增加partition

./bin/kafka-topics.sh –zookeeper127.0.0.1:2181–alter –partitions20–topic testKJ1

5、kafka生产者客户端命令

./kafka-console-producer.sh --broker-list localhost:9092--topic testKJ1

6、kafka消费者客户端命令

./则伏kafka-console-consumer.sh -zookeeper localhost:2181--from-beginning --topic testKJ1

7、kafka服务启孙念携动

./kafka-server-start.sh -daemon ../config/server.properties

8、下线broker

./高禅kafka-run-class.sh kafka.admin.ShutdownBroker --zookeeper127.0.0.1:2181--broker #brokerId# --num.retries3--retry.interval.ms60

shutdown broker

9、删除topic

./kafka-run-class.sh kafka.admin.DeleteTopicCommand --topic testKJ1 --zookeeper127.0.0.1:2181

kafka-docker上使用+常用指令

生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion

启动镜像

启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/server.properties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考

其中172.17.0.3需要改成自己docker的网桥连接地址

查看已启动容器

查看所有容器

启动未启动的容器

进入kafka容器

创建主题

主题和分区可以理解为:topic是逻辑划分,kafka通过topic进运吵虚行区分消息,topic的数据会被存储到日碰腔志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。 综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分) 。

查看topic信息

集群部署

可以通过compose集群化部署过es,这里通过创建另一个compose.yml文件来部署kafka,配置文件参考 docker-compose集群部署

生产者:

消费者:

方式一:从当前主题的迁移量位置+1开始取数据

方式二:从当前主题第一条消息开始消费

生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的

单播消息:

当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息

多播消息:

当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息

查看消费组详细信息:

CURRENT-OFFSET:最后被消费旁燃的偏移量

LOG-END-OFFSET:消息总量(最后一条消息的偏移量)

LAG :积压了多少条消息

常见问题:

1、如何防止消息丢失

生产者:使用同步消息发送;ack设置为1/all;设置同步分区数=2

消费者:把自动提交改成手动提交

2、如何防止消息的重复消费

针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;

3、消息积压

消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。

ps:

缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker

参考链接:

1、kafka安装教程--推荐

2、kafka配置文件server.properties参数说明

3、创建主题分区数

4、解决docker容器启动不了的问题

5、通过docker-compose集群部署

6、学习视频

Kafka | 常用命令行操作

本次实验的kafka集群有三个节点,即有三团枣个broker。

Topic操塌猜拆作的执行脚本在bin目录下的 kafka-topics.sh

例子中创建名为first的topic,2个分区,2个副本。

创建后可分别在三个节点兆凳下,进入kafka的存放数据的目录,查看topic的创建情况:first-0和first-1是以topic名和分区号组成,代表了2个分区;每个分区副本都是2个,而且相同分区副本是放在不同的节点上的,例如first-0的副本在node01和node03节点上,first-1的副本在node01和node02上。

查看数据目录下的topic是否被删除:发现topic名称后面多了delete删除标志,不要着急,等过会来看这个topic就会被删除。

每输入一条消息,消费者窗口会显示生产者发出的消息

kafka权限控制

转发文章--

请先在不配置任何身份验证的情况下启动Kafka

Kafka中的SCRAM实现使用Zookeeper作为凭证(credential)存储。 可以使用 kafka-configs.sh 在Zookeeper中创建凭据。 对于启用的每个SCRAM机制,必须通过添加具有机制名称的配置来创建凭证。 必须在启动Kafka broker之前创建代理间通信的凭据。 可以动态创建和更新客户端凭证,并使用更新的凭证来验证新连接。

这里只演示,不操作

或者不修改 kafka-server-start.sh 脚本, 而是将下面的内容添老笑加到 ~/.bashrc

再官方文档中写的是

这里其实没必要写成 SASL_SSL , 我们可以根据自己的需求选择SSL或PLAINTEXT, 我这里选择PLAINTEXT不加密明文传输, 省事, 性能也相对好一些

Kafka所有Broker

先使用kafka-console-producer 和枝搏 kafka-console-consumer 测试一下

可以看到admin用户无需配置ACL就可以生成消息

生产消息

可以看到报错了, 因为fanboshi用户还没有权限

其实也会报错的, 报错内容就不贴了

生产者

消费者

都没问题了

好像只能去zookeeper看?

尝试删除alice

去ZK查看

kafka ACL常用权限操作

创建topic

使用bin/kafka-topics.sh创建

注意工具bin/kafka-topics.sh访问的是zookeeper而不是kafka,即他是一个zookeeper client而不是一个kafka client,所以它的认证都是通过zookeeper完成的。

Case 1:如果zookeeper没有配置ACL激活:

Case 2:如果zookeeper已经配置ACL激活:

命令还是前面的那个命令,但是必须提供java.security.auth.login.config指向jaas.conf文件。例如:

命令的配置可以直接修改jvm的启动脚本,或者设置在环境变量里:

这里配置的用户必须是zookeeper服务端已经配置运行可以访问的客户端用户。例如,下面的zookeeper服务端配置:

运行客户端为admin的用户作为zookeeper客户端连接访问。

查询topic

查侍搭含询topic操作的ACL认证,同前面创建topic操作的认证一样,不细说,参考前面。

删除topic

删除topic操作的ACL认证,同前面创建topic操作的认证一样,不细说,参考前面。

关于kafka常用命令和kafka常用命令和解释的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表