springboot集成kafka(springboot集成kafka yml配置)
本篇文章给大家谈谈springboot集成kafka,以及springboot集成kafka yml配置对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
- 1、SpringBoot整合kafka
- 2、Kafka(四)集群之kafka
- 3、springboot整合kafka实现消息推送
- 4、SpringBoot集成Kafka,实现简单的收发消息
- 5、Springboot集成Kafka
SpringBoot整合kafka
此处简单记录一下 SpringBoot 和 Kafka 的凯枣整合。
1、 spring.kafka.consumer.enable-auto-commit 修改成 false
2、 spring.kafka.listener.ack-mode 修改成
|- manual : 表示手动提交,但是测试下来发现是批量提交
|- manual_immediate : 表示手动提交,当调用 Acknowledgment#acknowledge 之后立马提交。
1、消费的发送使毕闷用 KafkaTemplate 。
2、根据发盯数拆送的结果知道,消息发送成功还是失败。
KafkaListener :
topic : 表示需要监听的队列名称
groupId : 表示消费者组的id
1、
Kafka(四)集群之kafka
在章节二( )中,我们部署了单机的kafka,现在我们部署一套集群模式的kafka。
这里我准备了三台虚拟机:
192.168.184.134
192.168.184.135
192.168.184.136
每台机器部署一个zk和kafka。
上一章节中zk集群已经神中部署完毕。
在章节二中,134这台机器已经有kafka存在了,我们在另外两台机器上安装kafka:
在上面的文件中有几个关键点,我们一一进行配置,我会对配置中的说明翻译:
以下这两个listeners,advertised_listeners 是对外暴露的服务端口,真正建立连接用的是 listeners。
在内网中我们使用listenners就可以了,在docker等容器或云中使用advertised。游判山
下面这个是日志路径的配置
下面这个是个重点的东西,topic在磁盘上会分为多个partitions存储,相比单一文件存储,增加了并行性,在后续文章中会详细去讲解:
日志的保存时间:
以下是zookeeper的配置:
这里我们直接设置后台启动,三个节点都是如此:
这里面有个小坑,还记得之前我们搭建的单机环境吗?那时候默认的日志文件夹在/tmp/kafka-logs下面,生成了很多内容,导致我们134这个节点无法启动成功,报错如下:
解决这个问题只需要把/tmp/kafka-logs文件删除就好了。
看到日志出现这一句表明启动成功了:
下面我们验证下是否搭建成功了,首先使用kafkatool工机具连接看下:
我们在134节点创建一个topic:
查看topic列表:
在kafkatool中查看:
创建生产者:
创建消费者:
生成者发送冲游消息:
消费者接收消息:
到此为止,kafka的集群搭建已经完成了。在后面的文章我们会去学习如何在springboot中集成kafka。
[img]springboot整合kafka实现消息推送
本篇文章主要介绍的是springboot整合清首kafka。
1.使用docker安装kafka,移步
创建一个名为springboot-kafka-common的微服务,打包方式为jar,存放一些公共配置和公共类,如util等
1.配置pom文件
pom文件中以父工程作为父依赖,就不需要额外引入依赖了
2.新建一个user实体类
3.创建application-common.yml配置文件,主要添加闭或kafka的公共配置
1.pom文件配置
2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml
3.controller层
创建UserController
4.service层
创建UserService
创建UserServiceImpl
5.创建启动类
1.pom文件
2.创建yml配置文件
3.创建consumer消费者类
4.启动类
启动producer和consumer两个服务模块
访问producer微服务中的接口
会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体轿正伍
本文GitHub源码:
CSDN:
:
博客园:
个人博客: `
SpringBoot集成Kafka,实现简单的收发消息
在kafka的 config 目录下找到 server.properties 配置文件
把 listeners 和蚂让 advertised.listeners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092
KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码
通过 KafkaTemplate 模板类发送数据。
kafkaTemplate.send(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics
bootstrap-servers :kafka服务器地址(可以多个)
consumer.group-id :指定一个默认的组名
不指定的话会报
1. earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2. latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3. none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常
这个属性也是必须配置的,不然也是会报错的
在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumer.key-deserializer 和 consumer.value-deserializer 是消费者 key/value 反序列化
producer.key-deserializer 和 producer.value-deserializer 是生产者 key/value 序列化
StringDeserializer 是内置的字符串反序列化历物亏方式
StringSerializer 是内置的字符串序列化方式
在 org.apache.kafka.common.serialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化肢神方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer
详细可以参考
这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息
访问 ,就可以看到控制台打印消息了
Springboot集成Kafka
最近陆续收到不少朋友反馈,他们在计划要在springboot项目引入kafka中间件。在网上找过很多资料,但都比较零散,按照其说明进行操作后让项目裂拦局正常跑起来仍是比较坎坷,听说我对kafka比较了解,希望给予一起分享。刚好最近因为疫情,实际相对于平常稍宽松,也想借此写点东西,一来作为自己的总结,二来可以给予有需要的朋友一些引导,针对此文期望对各位遇到问题的朋友有一定的帮助。
1. 安装JDK ,具体版本可以根据项目实际情况选择肆让,目前使用最多的为jdk8
2. 安装Zookeeper ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.8
3. 安装Kafka ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.1
4. 安装Kafka Manage (非必要:安装主要是了对kafka项目操作提供图形化界面操作),具体版本可以根据项目实际情况选择,本项目使用的是1.3.3.7
parent
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-parent/artifactId
version2.3.12.RELEASE/version
relativePath/ !-- lookup parent from repository --
/parent
dependencies
!--springboot web依赖 --
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-web/artifactId
/dependency
!--kafka依赖 --
dependency
groupIdorg.springframework.kafka/groupId
artifactIdspring-kafka/artifactId
/dependency
/dependencies
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer: #生产者
# 发生错误后,消息重发的次数。重试1次,此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能时带有顺序写入的数据打乱顺序
#比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可衡银以使用的内存大小,按照字节数计算。模式时16k
batch-size: 16384 #16k
# 设置生产者内存缓冲区的大小
buffer-memory: 33554432
acks: 1
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
#group-id: default-group
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效
auto-commit-interval: 1S
enable-auto-commit: false
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
# 在侦听器容器中运行的线程数
concurrency: 5
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
-- 127.0.0.1:2181 zookeeper 服务器地址
-- replication-factor partitions 副本数量
--partitions partition数量
点击【Cluster】【Add Cluster】打开如下添加集群的配置界面:
输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.0)
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息生产端,为实践业务提供向kafka block发现消息的API
*/
@Component
@Slf4j
public class QueueProducer {
@Autowired
private KafkaTemplatekafkaTemplate;
public void sendQueue(String topic,Object msgContent) {
String obj2String =JSON.toJSONString(msgContent);
log.info("kafka service 准备发送消息为:{}",obj2String);
//发送消息
ListenableFuturefuture =kafkaTemplate.send(topic,UUID.randomUUID().toString(),obj2String);
future.addCallback(new ListenableFutureCallback() {
//消息发送成功
@Override
public void onSuccess(SendResult stringObjectSendResult) {
log.info("[kafka service-生产成功]topic:{},结果{}",topic, stringObjectSendResult.toString());
}
//消息发送失败
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理,本处只是记录了错误日志,可结合实际业务做处理
log.info("[kafka service-生产失败]topic:{},失败原因{}",topic, throwable.getMessage());
}
});
}
}
package com.charlie.cloudconsumer.service.impl.kafka;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : 消费端实际的业务处理对象
*/
@Component //添加此注解的原因是因为消费端在项目启动时就会初始化,消费端需要用到此类,故也让此类在项目启动时进行注册
public class QueueDataProcess {
public boolean doExec(Object obj) {
// todu 具体的业务逻辑
if (ObjectUtils.isNotEmpty(obj)) {
return true;
}else {
return false;
}
}
}
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import com.charlie.cloudconsumer.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息消费端,负责消费特定topic消息
*/
@Component
@Slf4j
@SuppressWarnings("all")
public class QueueConsumer {
@Autowired
private QueueDataProcess queueDataProcess;
/**
*
*/
@KafkaListener(topics ="test", groupId ="consumer")
public void doConsumer(ConsumerRecord record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {
Optional message =Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
Object msg =message.get();
log.info("[kafka-消费] doConsumer 消费了: Topic:" + topic +",Message:" +msg);
boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(),Order.class));
if (res) {
ack.acknowledge();
}
}catch (Exception ex) {
log.error("[kafka-消费异常] doConsumer Error {} ",ExceptionUtils.getFullStackTrace(ex));
}
}
}
}
package com.charlie.cloudconsumer.controller;
import com.alibaba.fastjson.JSON;
import com.charlie.cloudconsumer.common.utils.AjaxResult;
import com.charlie.cloudconsumer.common.utils.BuildResponseUtils;
import com.charlie.cloudconsumer.model.Order;
import com.charlie.cloudconsumer.service.impl.kafka.QueueProducer;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息发送控制器,负责接受用户的发送的队列消息
*/
@RestController
@RequestMapping(value ="/kafka",produces =MediaType.APPLICATION_JSON_VALUE)
public class KafkaController {
@Autowired
private QueueProducer queueProducer;
@RequestMapping(value = "/send",method = RequestMethod.POST)
public AjaxResult? sendMsg(@RequestBody Order order) {
AjaxResult? ajaxResult= null;
if (ObjectUtils.isNotEmpty(order)) {
queueProducer.sendQueue("test",order);
ajaxResult = BuildResponseUtils.success(0,"发送消息:"+ JSON.toJSONString(order) + "成功!");
} else {
ajaxResult = BuildResponseUtils.success(1,"发送消息:"+ JSON.toJSONString(order) + "失败!");
}
return ajaxResult;
}
}
关于springboot集成kafka和springboot集成kafka yml配置的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。