kafka官网(kafka the definitive guide)
本篇文章给大家谈谈kafka官网,以及kafka the definitive guide对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
kafka使用ssl加密和认证
Apache kafka 允许clinet通过SSL连接,SSL默认是不可用的,需手动开启。
主要步骤是:
os: ubuntu 18
java: 1.8.0_275
kafka: 2.7.0
官方的步骤
把上面的步骤拆开来,并解释
生成密钥和证书,可以使用java的keytool来生产桐锋。我们将生成密钥到一个临时的密钥库,之后我们可以导出并用CA签名它。
例子
keystore: 密钥仓库存储证书文件。密钥仓库文件包含证书的私钥(保证私钥的安全)。
validity: 证书的有效时间,天
此步骤要注意的是,名与姓(CN)这一项必须输入域名,如 "localhost",切记不可以随意写,我曾尝试使用其他字符串,在后面客户端生成证书认证的时候一直有问题。
客户端将根据以下两个字段之一验证服务器的完全限定域名(FQDN):
这两个字段均有效,但是RFC-2818建议使用SAN。SAN更加的灵活,允许声明多个DNS条目。另一个优点是,出于授权目的,可以将CN设置为更有意义的值。 要添加SAN,需将以下参数 -ext SAN=DNS:{FQDN} 追加到keytool命令:
完成上面步骤,可使用命令
来验证生成证书的内容
server.keystore.jks 这个文件包含了一对 公私钥 ,和 一个证书 来识别机器。但是,证书是未签名的,这意味着攻击者可以创建一个这样的证书来伪装成任何机器。
生成的CA是一个简单的 公私钥对 和 证书 ,用于签名其他的证书。
例子
将生成的CA添加到 **clients' truststore(客户的信任库)** ,以便client可以信任这个CA:
ca-cert: CA的局拦晌证书
例子
客户端的信任库存储所有客户端信任的证书,将证书导入到一个信任仓库也意味着信衡颂任由该证书签名的所有证书,正如上面的比喻,信任政府(CA)也意味着信任它颁发的所有护照(证书),此特性称为信任链,在大型的kafka集群上部署SSL时特别有用的。可以用单个CA签名集群中的所有证书,并且所有的机器共享相同的信任仓库,这样所有的机器也可以验证其他的机器了。
用步骤2.2 生成的CA签名 步骤2.1生成的证书。首先导出请求文件:
cert-file: 出口,服务器的未签名证书
然后用CA签名:
例子
最后,你需要导入CA的证书和已签名的证书到密钥仓库:
参数
Kafka Broker支持监听多个端口上的连接,通过 server.properteis 配置,最少监听1个端口,用逗号分隔。
下面是broker端需要的SSL配置,
more config/server.properties
Producer和Consumer的SSL的配置是相同的。
如果broker中不需要client(客户端)验证,那么下面是最小的配置示例:
more client-ssl.properties
如果需要客户端认证,则必须像 步骤2.1 一样创建密钥库,并且还必须配置以下内容:
启动 zookeeper 和 kafka
在不同的Terminal 分别运行下面命令
Terminal1
Terminal2
Terminal3
创建topic
生产和消费topic
官网:
Java 11 导致 SSL handshake fail
生成本机证书时,CN 需要填写 localhost 或者真实的域名,否则客户端连接失败。
[img]kafka如何从头消费历史数据
消费者要从头开始消费某个topic的全量数据,需要满足2个条件(spring-kafka):
(1)使用一个全新的"group.id"(就是之前没有谈中被任何消费者使用过); (2)指定"auto.offset.reset"参含纳山数的值为earliest;
注意:从kafka-0.9版本及以后,kafka的消费者组和offset信息就不存zookeeper了,而是存到broker服务器上,所以茄燃,如果你为某个消费者指定了一个消费者组名称(group.id),那么,一旦这个消费者启动,这个消费者组名和它要消费的那个topic的offset信息就会被记录在broker服务器上。
比如我们为消费者A指定了消费者组(group.id)为fg11,那么可以使用如下命令查看消费者组的消费情况:
bin/kafka-consumer-groups.sh --bootstrap-server 172.17.6.10:9092 --describe --group fg11
在kafka官网可以看到
对于auto.offset.reset的说明,通过在项目中配置选项,同时新建一个groupid就实现了从头开始消费。
auto.offset.reset具体含义:(注意版本不同,配置参数会有所不一致,具体参考官网)
3分钟带你彻底搞懂 Kafka
Kafka到底是个啥?用来干嘛的?
官方定义如下:
翻译过来,大致的意思就是,这是一个实时数据处理系统,可以横向扩展,并高可靠!
实时数据处理 ,从名字上看,很好理解,就是将数据进行实时处理,在现在流行的微服务开发中,最常用实时数据处理平台有 RabbitMQ、RocketMQ 等消息中间件。
这些中间件,最大的特点主要有两个:
在早期的 web 应用程序开发中,当请求量突然上来了时候,我们会将要处理的数据推送到一个队列通道中,然后另起一个线程来不断轮训拉取队列中的数据,从而加快程序的运行效率。
但是随着请求量不断的增大,并且队列通道的数据一致处于高负载,在这种情况下,应用程序的内存占用率会非常高,稍有不慎,会出现内存不足,造成程序内存溢出,从而导致服务不可用。
随着业务量的不断扩张,在一个应用程序内,使用这种模式已然无法满足需求,因此之后,就诞生了各种消息中间件,例如 ActiveMQ、RabbitMQ、RocketMQ等中间件。
采用这种模型,本质就是将要推送的数据,不在存放在当磨枯前应用程序的内存中,而是将数据存放到另一个专门负责数据处理的应用程序中,从而实现服务解耦。
消息中间件 :主要的职责就是保证能接受到消息,并将消息存储到磁盘,即使其他服务都挂了,数据也不会丢失,同时还可以对数据消费情况做好监控工作。
应用程序 :只需要将消息推送到消息中间件,然后启用一个线程来不断从消息中间件中拉取数据,进行消费确认即可!
引入消息中间件之后,整个服务开发会变得更加简单,各负其责。
Kafka 本质其实也是消息中间件的一种,Kafka 出自于 LinkedIn 公司,与 2010 年开源到 github。
LinkedIn 的开发团队,为了解决数据管道问题,起初采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 LinkedIn 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,LinkedIn 决定研发自己的消息传递系统, Kafka 由此诞生 。
在 LinkedIn 公司,Kafka 可以有效地处理每天数十亿条消息的指标和用户活动跟踪,其强大的处理能力,已经被业界所认可,并成为大数据流水线的首选技术。
先来看一张图, 下面这张图就是 kafka 生产与消费的核心架构模型 !
如果你看不懂这些概念没关系,我会带着大家一起梳理一遍!
简而言之,kafka 本质就是一个消息系统,与大多数的消息系统一样,主要的特点如下:
与 ActiveMQ、RabbitMQ、RocketMQ 不同的地方在于,它有一个**分区 Partition **的概念。
这个分区的意思就是说,如果你创建的 topic 有5个分区,当你一次性向 kafka 中推 1000 条数据时,这 1000 条数据默认会分培游哪配到 5 个分区中,其中每个分区存储 200 条数据。
这样做的目的,就是方便消费者从不同的分区拉取数据,假如你启动 5 个线程同时拉取数据,每个线程拉取一个分区,消费速度会非常非常快!
这是 kafka 与其他的消息系统最大的不同!
和其他的中间件一样,kafka 每次发送数据都是向 Leader 分区发送数据,并顺序写入到磁盘,然后 Leader 分区会将数据同步到各个从分区 Follower ,即使主分区挂了,也不会影响服务的正常运行。
那 kafka 是如何将数据写入到对应的分区呢?kafka中有以下几个原则:
与生产者一样,消费者主动的去kafka集群拉取消息时,也是从 Leader 分区去拉取数据。
这里我们需要重点了解一个名词: 消费组 !
考虑到多个消配码费者的场景,kafka 在设计的时候,可以由多个消费者组成一个消费组,同一个消费组者的消费者可以消费同一个 topic 下不同分区的数据,同一个分区只会被一个消费组内的某个消费者所消费,防止出现重复消费的问题!
但是不同的组,可以消费同一个分区的数据!
你可以这样理解,一个消费组就是一个客户端,一个客户端可以由很多个消费者组成,以便加快消息的消费能力。
但是,如果一个组下的消费者数量大于分区数量,就会出现很多的消费者闲置。
如果分区数量大于一个组下的消费者数量,会出现一个消费者负责多个分区的消费,会出现消费性能不均衡的情况。
因此,在实际的应用中,建议消费者组的 consumer 的数量与 partition 的数量保持一致!
光说理论可没用,下面我们就以 centos7 为例,介绍一下 kafka 的安装和使用。
kafka 需要 zookeeper 来保存服务实例的元信息,因此在安装 kafka 之前,我们需要先安装 zookeeper。
zookeeper 安装环境依赖于 jdk,因此我们需要事先安装 jdk
下载zookeeper,并解压文件包
创建数据、日志目录
配置zookeeper
重新配置 dataDir 和 dataLogDir 的存储路径
最后,启动 Zookeeper 服务
到官网 下载想要的版本,我这里下载是最新稳定版 2.8.0 。
按需修改配置文件 server.properties (可选)
server.properties 文件内容如下:
其中有四个重要的参数:
可根据自己需求修改对应的配置!
启动 kafka 服务
创建一个名为 testTopic 的主题,它只包含一个分区,只有一个副本:
运行 list topic 命令,可以看到该主题。
输出内容:
Kafka 附带一个命令行客户端,它将从文件或标准输入中获取输入,并将其作为消息发送到 Kafka 集群。默认情况下,每行将作为单独的消息发送。
运行生产者,然后在控制台中键入一些消息以发送到服务器。
输入两条内容并回车:
Kafka 还有一个命令行使用者,它会将消息转储到标准输出。
输出结果如下:
本文主要围绕 kafka 的架构模型和安装环境做了一些初步的介绍,难免会有理解不对的地方,欢迎网友批评、吐槽。
由于篇幅原因,会在下期文章中详细介绍 java 环境下 kafka 应用场景!
linux安装kafka
Apache官网
tar -xzf kafka_2.10-0.8.2.2.tgz
cd kafka_2.10-0.8.2.2
KAFKA需要启动两个服务:
1,启动zookeeper: bin/zookeeper-server-start.sh config/zookeeper.properties (能在执行指令时退出操作)
2,启动kafka: bin/kafka-server-start.sh config/server.properties
1,创建topic
bin/kafka-topics.sh --create --zookeeper 10.202.4.179:2181 --replication-factor 1 --partitions 1 --topic test
创建一个名为test的topic,只有一个副本,一个分区。
2,宴闷羡创建生产者,启动producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
topic要对应之前晌拍创建罩桐的topic
之后便可以发送消息
3,创建消费者,启动consumer
bin/kafka-console-consumer.sh --zookeeper 10.202.4.179:2181 --topic test --from-beginning
topic要对应之前创建的topic
在生产者发送消息后,消费者便可以收到对应的消息了
kafka官网上的安装文档在哪
kafka官网下载 kafka_2.10-0.8.2.1.tgz并复制到虚拟机Ubuntu
2.1解压到该路径
tar zxvf kafka_2.10-0.8.2.1.tgz
sudo mv kafka /home/chen-pc/kafka
启动和停止
启动Zookeeper server:
Shell代码
bin/zookeeper-server-start.sh config/zookeeper.properties
其中是为了能退出命令行
停止Kafka server
Shell代码
bin/kafka-server-stop.sh
出现如下:
INFO Shutting down. (kafka.log.LogManager)
INFO Shutdown complete. (kafka.log.LogManager)
INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
INFO Session: 0x154957ee6bc0000 closed (org.apache.zookeeper.ZooKeeper)
INFO EventThread shut down (org.apache.zookeeper.ClientCnxn)
INFO [Kafka Server 0], shut down completed (kafka.server.KafkaServer)
停止Zookeeper server:
Shell代码
bin/zookeeper-server-stop.sh
出现如下:
[1]+ Exit 130 bin/zookeeper-server-start.sh config/zookeeper.properties
4.创建启动、关闭kafka脚本
cd /home/chen-pc/kafka
创建启动脚本
vi kafkastart.sh #编辑,添加以下代码
#!/bin/sh
#启动zookeeper
/home/chen-pc/kafka/bin/zookeeper-server-start.sh /home/chen-pc/kafka/config/zookeeper.properties
#等3秒后执行
sleep 3
#启敬轮动kafka
/home/chen-pc/kafka/bin/kafka-server-start.sh /home/chen-pc/kafka/config/server.properties
保存退出
vi kafkastop.sh #编告稿旅辑,添加以下代码
#!/bin/sh
#关闭zookeeper
/home/chen-pc/kafka/bin/zookeeper-server-stop.sh /home/chen-pc/kafka/config/zookeeper.properties
sleep 3
#关闭kafka
/home/chen-pc/kafka/bin/kafka-server-stop.sh /home/chen-pc/kafka/config/server.properties
添加脚本执行权限
chmod +x kafkastart.sh
chmod +x kafkastop.sh
以后启动kafka
sh /home/chen-pc/kafka/kafkastart.sh
Jps查看进程
关闭kafka
sh /home/chen-pc/kafka/kafkastop.sh
至此,Linux下Kafka单机安装配置完成。
4. 单节点kafka
sh /home/chen-pc/kafka/kafkastart.sh
Kafka创建topic
cd kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Kafka 删除topic
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
查看topic
/home/chen-pc/kafka/bin/袜凳kafka-topics.sh --list --zookeeper localhost:2181
启动consumer producer,并在producer启动后的console输入一些信息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Kafka安装部署成功
关于kafka官网和kafka the definitive guide的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。