kafkagroup(kafkagroupid作用)
本篇文章给大家谈谈kafkagroup,以及kafkagroupid作用对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、kafka 怎么创建group
- 2、Kafka Consumer Group和Consumer Rebalance机制
- 3、Kafka查看topic、consumer group状态命令
- 4、kafka集群原理
- 5、kafka groupid何时定义
- 6、华为kafka安全版重置group中的topic offset不生效问题
kafka 怎么创建group
innodb_log_file_size = 500M #事物日数喊志大小
#innodb_log_file_size =100M
innodb_log_files_in_group = 2 #两友毕御组事物日志
innodb_log_group_home_dir = /longxibendi/mysql/mysql/好岩var/#日志组
[img]Kafka Consumer Group和Consumer Rebalance机制
在新建一个Consumer时,我们可以通过指定groupId来将其添加进一个Consumer Group中。 Consumer Group是为了实现多个Consumer能够并行的消费一个Topic,并且一个partition只能被一个Consumer Group里的一个固定的Consumer消费。
对于一个Consumer Group,可能随时都有Consumer加入或者退出这个Consumer Group,Consumer列表的变化势必会引起partition的重新分配。这个为Consumer分配partition的过程就被称为Consumer Rebalance。
出现任何以下的场景都会触发Consumer Rebalance操作:
默认情况下,Kafka提供了两种分配策略:Range和RoundRobin 。
range策略的具体步骤如下:
举个例子,比如有两个消费者C0和C1,两个topic(t0,t1),每个topic有三个分区p(0-2),
那么采用Range策略,分配出的结果为:
RoundRobin策略和Range策略类型,唯一的区别就是Range策略分配partition时,是按照topic逐次划分的。而RoundRobin策略则是将所缓此枯有topic的所有分区一起排序,然后遍历partition分配给消费者。
因此,采用RoundRobin策略,分配出的结果为:
Group Coordinator是负责管理Consumer Group的组件。当扒亩一个Consumer希望加入某一个Consumer Group时,它会发送一个请求给Group Coordinator。Group Coordinator负责维护一个Consumer Group中所有的Consumer列表,随着Consumer的加入和退出,Coordinator也会随之更新这个列表。
第一个加入Consumer Group的Consumer被称为leader。
一旦Consumer Group中的成员发生变化,例如有新的Consumer加入,那么就需要为其分配partition;或者有Consumer退出,那么就需要扰洞将其负责消费的partition分配给组内其他成员。因此Consumer Group中的成员发生变化, Group Coordinator就负责发起Consumer Rebalance活动。
值得注意的是,真正的Consumer Rebalance行为是由Consumer Group Leader执行的。Group Leader首先向Coordinator获取Group中的Consumer成员列表,然后根据Rebalance策略,将partition分配给Consumer Group中的成员,再将分配结果告知Coordinator。最后,Coordinator将partition分配结果通知给每一个Consumer。在Consumer Rebalance的过程中,所有的Consumer都不允许消费消息。
Kafka查看topic、consumer group状态命令
以下命令中使用的zookeeper配置地址为127.0.0.1:2181,bootstrap--server(即broker)地址为: 127.0.0.1:9292
1,查看kafka topic列表,使用--list参数
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
__consumer_offsets
lx_test_topic
test
2,查看kafka特定topic的详情,源租使用--topic与--describe参数
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic lx_test_topic --describe
Topic:lx_test_topic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: lx_test_topic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
列出了lx_test_topic的parition数量、replica因子团知以及每个partition的leader、replica信息
3,查看consumer group列表,使用--list参数
查看consumer group列表有新、旧两种命令,分别查看新版(信息保存在broker中)consumer列表和老版(信息保雹或兆存在zookeeper中)consumer列表,因而需要区分指定bootstrap--server和zookeeper参数:
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --list
lx_test
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
console-consumer-86845
console-consumer-11967
4,查看特定consumer group 详情,使用--group与--describe参数
同样根据新/旧版本的consumer,分别指定bootstrap-server与zookeeper参数:
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9292 --group lx_test --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
lx_test lx_test_topic 0 465 465 0 kafka-python-1.3.1_/127.0.0.1
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group console-consumer-11967 --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNER
Could not fetch offset from zookeeper for group console-consumer-11967 partition [lx_test_topic,0] due to missing offset data in zookeeper.
console-consumer-11967 lx_test_topic 0 unknown 465 unknown console-consumer-11967_aws-lx-1513787888172-d3a91f05-0
其中依次展示group名称、消费的topic名称、partition id、consumer group最后一次提交的offset、最后提交的生产消息offset、消费offset与生产offset之间的差值、当前消费topic-partition的group成员id(不一定包含hostname)
上面示例中console-consumer-11967是为了测试临时起的一个console consumer,缺少在zookeeper中保存的current_offset信息。
kafka集群原理
副本进入ISR列表有两个条件 :
1.副本节点不能产生分区,必须能与zookeeper保持会话以及跟leader副本网络连通
2.副本能复制leader上的所有写操作,并且不能落后太多。(与leader副本同步滞后的副本,是由 replica.lag.time.max.ms 配置决定的,超过这个时间都没有跟leader同步过的一次的副本会被移拦唯局出ISR列表)
2.设置如下配置让Kafka自动进行:
注意
如下情况可能会触发消费者rebalance
Rebalance过程
1.选择组协山好调器
选择逻辑
2.加入消费组JOIN GROUP
重点
3.(SYNC GROUP)
消费者Rebalance分区分配策略 :简让
HW
kafka groupid何时定义
情况是这样的,袜则旅在我们系统中有多个Consumer的客户端(客户端个数是不确定的,因为在系统工作过程中有的业务节点会脱离,有些业务节点会增加进来),Producer也有多个。但是盯槐Producer发送的消息种类只有一种,所以topic只创告凳建了一个, 消息量很大,所以使用了多个Consumer来处理。现在想实现如下的订阅/推送效果,多个Producer进行消息的推送,例如消息X1、X2、X3、X4、X5.。。。。。。然后由多个Consumer分别进行拉去,Consumer1拉取到:X1、X4、X7。。。Consumer2拉取到:X2、X5、X8.。。。。。。。。如此类推
华为kafka安全版重置group中的topic offset不生效问题
华为kafka安全版,使用New Consumer API,采用group方式,消费者消费特定group下的topic数据,当group.id重置时,组中的所有topic就就会重头开始消费。如果只需要group中的某一个topic从宴陆头开始消费,而其他的topic接着上一次消费的位置继续消费,就不能简单的更换group.id来实现了。以下有两种方式可以用来重置差祥码offset (调整过程中必须保证消费者处于停止状态!!!)
以上操作完之后,正常情况下就可以执行命令1检查重置是否成功,虚哪当然笔者的情况是重置没有生效,执行操作1发现currentOffset依然没有变,于是就有了第二种方式 —— 使用consumer的java API
方式二使用的是消费者消费数据时提交的offset信息,一般来说只要topic可以正常被消费,方式二都能生效。
关于kafkagroup和kafkagroupid作用的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。