kafka创建topic(kafka创建topic的groupid)
本篇文章给大家谈谈kafka创建topic,以及kafka创建topic的groupid对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、【大数据技术】kafka简介和底层实现
- 2、怎么设置kafka topic数据存储时间
- 3、如何在Kafka上创建一个Topic
- 4、kafka极简入门(三)--创建topic
- 5、Hadoop生态架构之kafka
- 6、windows 下远程连接kafka服务器并创建topic 部署服务
【大数据技术】kafka简介和底层实现
一、 K afka的三大组件:Producer、Server、Consumer
1、Kafka的 Producer 写入消息
producer采用push(推)模式将消息发布到broker,每条消息,都被追加到分区中(顺序写到磁盘,比随机写内存效率高)。
· 分区的作用:方便容量扩展,可以多并发读写数据,所以我们会指定多个分区进行数据存储。
· 一般根据 event_key的hash % numPartitions来确定写入哪个分区,如果写入时没有指定key,则轮询写入每个分区;因此导致每个partition中消息是有序的,整体无序。
每条event数据写入partitionA中,并且只会写入partitionA_leader,当partitionA_leader写入完成后partitionA_flower节点再去partitionA_leader上异步拉取数据;默认ack为1,表示不会等待partitionA_flowers写入完成;如果设置ack为副本数或ack=-1,则等待副本全部写完,再写入下一条数据。
2、kafka的 broker—— 保存消息
1、 创建topic,并指定分区和副本数
2、每个分区(partition)有一个leader,多个follower,pull数据时先寻找leader,只会读leader上的数据,leader和follower不会在一个节点上,leader节点宕机后,其中一个follower变成leader
3、 消息数据存在每个分区中,默认配置每条消息保存7天 或 分区达到1GB 后删除数据
3、 K afka的 Consumer 消费数据:
1、consumer采用pull(拉)模式从broker中读取数据。
2、如果一个消费者来消费同一个topic下不同分区的数据,会读完一个分区再读下一个分区
生产者(producer)A PI 只有一套 ; 但是消费者(consumer)A PI 有两套(高级A PI 和低级A PI )
一、高级API:
Zookeeper管理offset(默认从最后一个开始读新数据,可以配置从开头读)
kafka server(kafka服务)管理分区、副本
二、低级API:
开发者自己控制offset,想从哪里读就从哪里读
// SimpleConsumer是Kafka用来读数据的类
// 通过send()方法获取元数据找到leader
TopicMetadataResponse metadataResponse = simpleConsumer.send(request); //通过metadataResponse获取topic元数据,在获取topic中每个分区的元数据
// fetch 抓取数据
FetchResponse response = simpleConsumer.fetch(fetchRequest);
// 解析抓取到的数据
ByteBufferMessageSet messageAndOffsets = response.messageSet(topic, partition);
二、数据、broker状态,consumer状态的存储
一、在本地存储原始消息数据:
1、hash取模得分区、kafka中每条消息有一个Key,用来确定 每条数据存储到哪个分区中
2、轮询
3、自定义分区
二、在zookeeper存储kafka的元数据
三、存储consumer的offset数据
每个consumer有一个孝渣陆Key(broker+Topic+partition)的hash,再取模后 用来确定offset存到哪个系巧顷统文件中,Value是partitionMetaData。
1、使用zookeeper启动,zookeeper来存储offset
消费者梁手 消费消息时,offset(消费到的下标)会保存在consumer本地和zookeeper中(由本地上传到zookeeper中,所以本地会保存offset)
2、使用bootstrap启动,本地存储offset(在本地可以减少两节点交互),zookeeper存储其他数据
三、某 F lume对接Kafka案例
[img]怎么设置kafka topic数据存储时间
1、Kafka创建topic命令很简单,一条命令足矣:bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test 。
2.此命令将创建一个名为test的运竖topic,其中有三个分区,每个分区需要分配三个副本。
三。topic创建主要分为两部分:命令行controller逻辑部分。
四。后台逻辑将监听zookeeper下对应的目录节点。一旦启动topic创建命令,它将创建一个新的数据节点并触发后台创建逻辑。
五个。确定分区副本祥悄启分配方案(即,将每个分区副本分配给哪个代理);创建zookeeper节点并将此方案写入/brokers/topics/topic节点。
五个。确定分区副本分配方案(即每个分区的副本分配给哪个分区)broker上);创建zookeeper节点,把这个方案写入/brokers/topics/topic节点下。
6、Kafka controller这一部分的主要任务是:创建分区;创谨如建副本;为每个分区选择leaderISR;;更新各种缓存。
如何在Kafka上创建一个Topic
在运仔李行/调试设置中,编辑配置对话框中有“Main”这个选项卡,我们可以勾选“Stop in main”这个复选框。如果选中,那么在调试一个基于main方法的Java程序时,程序会在main方法第一行位置便停止执行。
进入服务器后,找到念族迟kafka安装目录进入bin文件夹,输穗汪入命令--- 查看kafka现有主题命令:。/kafka-topics.sh --list --zookeeper zk_host:port。
kafka极简入门(三)--创建topic
回顾 kafka极简入门(二)--安装
topic是kafka的生产者和消费者最小交互的单位,我们先从topic入手,创建第一个topic.
或
所以执行上面命令将会创建一个名为mytest的topic,该topic下面有1个分区,并且该分区只有1个猛并副本。
PS:除了手动创建主题外,还可以将代理配置为在发布不存在的主题时自动创建主题
Partition:0 表示该分区的id为0
leader: 9 表示分区的首领副本所在的broker(本例子中broker.id配置为9,所以这里显示9,具体在config/server.properties配置。这里只有一个分区,所以首领分区也就是自己)
Replicas: 9 表示分区的跟随副本所在的broker
Isr: 9 表示分镇槐区的同步副本所在的broker(同步副本可以认御知友为跟首领副本准实时同步的副本,可以配置判断条件,后面会讲,首领副本挂掉后,服务器会从同步副本中选举新的首领)
发送三个消息,分别是hello, world和!
注意: --from-beginning 表示从最开始的offset处开始消费。如果不写表示从最新的offset处消费,那么先发送了消息再开启消费者是收不到已发送的信息的
Hadoop生态架构之kafka
1、定位:分布式的消息队列系统,同时提供数据分布式缓存态卜功能(默认7天)
2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访问还要快)
3、Storm(分布式的实时计算框架)
Kafka目标成为队列平台
4、基本组件:
Broker:每一台机器是一个Broker
Producer:日志消息生产者,主要写数据
Consumer:日志消息消费者,主要读数据
Topic:是虚拟概念,不同的consumer去指定的topic去读数据,不同producer可以往不同的topic去写
Partition:是实际概念,文件夹,是在Topic的基础上做了进一步分层
5、Partition功能:负载均衡,需要保证消息的顺序性
顺序性拿烂的保证:订阅消息是从头往后读取的,写消息是尾部追加,所以整体消息是顺序的
如果有多个partiton存在,可能会出现顺序不一致帆敏穗的情况,原因:每个Partition相互独立
6、Topic:逻辑概念
一个或多个Partition组成一个Topic
7、Partition以文件夹的形式存在
8、Partition有两部分组成:
(1)index log:(存储索引信息,快速定位segment文件)
(2)message log:(真实数据的所在)
9、HDFS多副本的方式来完成数据高可用
如果设置一个Topic,假设这个Topic有5个Partition,3个replication
Kafka分配replication的算法:
假设:
将第i个Partition分配到(i % N)个Broker上
将第i个Partition的第j个replication分配到( (i+j) % N)个Broker上
虽然Partition里面有多个replication
如果里面有M个replication,其中有一个是Leader,其他M-1个follower
10、zookeeper包系统的可用性,zk中会保存一些meta信息(topic)
11、物理上,不同的topic的消息肯定是分开存储的
12、偏移量——offset:用来定位数据读取的位置
13、kafka内部最基本的消息单位——message
14、传输最大消息message的size不能超过1M,可以通过配置来修改
15、Consumer Group
16、传输效率:zero-copy
0拷贝:减少Kernel和User模式上下文的切换
直接把disk上的data传输给socket,而不是通过应用程序来传输
17、Kafka的消息是无状态的,消费者必须自己维护已消费的状态信息(offset)
减轻Kafka的实现难度
18、Kafka内部有一个时间策略:SLA——消息保留策略(消息超过一定时间后,会自动删除)
19、交付保证:
at least once:至少一次(会有重复、但不丢失)
at most once:最多发送一次(不重复、但可能丢失)
exactly once:只有一次(最理想),目前不支持,只能靠客户端维护
20、Kafka集群里面,topic内部由多个partition(包含多个replication),达到高可用的目的:
日志副本:保证可靠性
角色:主、从
ISR:是一个集合,只有在集合中的follower,才有机会被选为leader
如何让leader知道follower是否成功接收数据(心跳,ack)
如果心跳正常,代表节点活着
21、怎么算“活着”
(1)心跳
(2)如果follower能够紧随leader的更新,不至于被落的太远
如果一旦挂掉,从ISR集合把该节点删除掉
前提:需要把zookeeper提前启动好
一、单机版
1、启动进程:
]# ./bin/kafka-server-start.sh config/server.properties
2、查看topic列表:
]# ./bin/kafka-topics.sh --list --zookeeper localhost:2181
3、创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic newyear_test
4、查看topic描述:
]# ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic newyear_test
5、producer发送数据:
]# ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic newyear_test
6、consumer接收数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic newyear_test --from-beginning
7、删除topic:
]# ./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic newyear_test
二、集群版
在slave1和slave2上的broker.id一定设置不同
分别在slave1和slave2上开启进程:
./bin/kafka-server-start.sh config/server.properties
创建topic:
]# ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic newyear_many_test
1、实现一个consumer group
首先在不同的终端分别开启consumer,保证groupid一致
]# python consumer_kafka.py
执行一次producer:
]# python producer_kafka.py
2、指定partition发送数据
]# python producer_kafka_2.py
3、指定partition读出数据
]# python consumer_kafka_2.py
consumer_kafka.py:
producer_kafka.py:
consumer_kafka_2.py:
producer_kafka_2.py:
1.新建./conf/kafka_test/flume_kafka.conf
2.启动flume:
]# flume-ng agent -c conf -f ./conf/kafka_test/flume_kafka.conf -n a1 -Dflume.root.logger=INFO,console
启动成功,如下图:
3.测试:
1.1flume监控产生的数据:
]# for i in seq 1 100 ; do echo '==== '$i 1.log ; done
1.2kafka消费数据:
]# ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_1013 --from-beginning
消费结果如下图:
windows 下远程连接kafka服务器并创建topic 部署服务
一.打包项目镜像:
利用Dockerfile 来打包项目的镜像
本次项目共依赖两个镜像(一个基础系统环境和一个项目镜像)
本次直接将Dockerfile写好后,用shell脚本build.sh启动打包:
然后切换到项目的目录下找到build.sh,运行即可打包项目镜像
若
报错:"failed to dial gRPC: cannot connect to the Docker daemon. Is 'docker daemon' running on this host?: dial unix /var/run/docker.sock: connect: permission denied
"
就用
出现以下说明打包成功,接下来可以开始部署:
注意:如果遇到只读权限不能修改时,将host文件复制一份到桌面,修改后在替换原来的host文件
在hosts文件末尾加上kafka服务器 !外网! 39. 0.25...地址,修改后的格式如下:
1.1注意: 修改阿里云服务器的hosts 文件来配宽辩顷置 kafka的服务器地址:
在hosts 文件最后加入:
添加的 kafka-server 就是以下创建topic命令中的 kafka-server别名,
监听远程kafka:新建消费者:
远程创建topic的实例:
查看远程已创建的topc:
本地:
远程修改后的kafka topic:
2.通过git Bash 切换到kafka客户端的bin目录:
桌面打开 gitBash,切换到本地kafka软件目录:
这里一定要切换为windows
3.查看已经有的topic
--topic 指定topic名字
--replication-factor 指定副本数,因为我的灶念是集群环境,这里副本数就为3
--partitions 指定分区数,这个参数需要根据broker数和数据量决定,正常情况下,每个broker上两个partition最好
注意:服务器部署时候一定要用内网172. .开头的,外部访问设为外网ip
不然会导致Kafka写入数据的时候报错 : TImeout
4.1本地docker创建topic:
4.2 本地windows 创建topic
进入本地软件路径KAFKA/BIN/WIONDOWS
创建topic
5.修改服务器的host:
一定要注意加sudo 不然会导致readonly 无法修改
在host 文件的末尾加上以下:
6.切换到工程部署的目录
7.清理redis,不然数据慎陆有残留:
7.1服务器上的redis挂载清除:
在 docker-compose.yml中注销这几行: 目的是每次启动不必记录上次没有执行完的数据.
这个是用来记录redis中假如上次指定的是1到100万块,没有执行完.下次接着执行没执行完的任务,测试时暂时关闭
7.2删除volume:
7.3 如果volume文件被占用时,先删除占用容器:
7.4 清除redis中的数据
进入redis容器中:
8.部署命令:
8.1开启docker可视化web上监控docker:
然后访问:
宿主机IP + 9000端口
8.2执行部署命令,启动服务:
9.部署时报错: yaml: line 46: did not find expected key
原因: docker-compose.yml文件中第46行 报错
解决:将所有数据对齐,不要有多余的空格.
关于kafka创建topic和kafka创建topic的groupid的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。