springboot集成kafka(springboot集成kafka二)
本篇文章给大家谈谈springboot集成kafka,以及springboot集成kafka二对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
Springboot集成Kafka
最近陆续收到不少朋友反馈,他们在计划要在springboot项目引入kafka中间件。在网上找过很多资料,但都比较零散,按照其说明进行操作后让项目裂拦局正常跑起来仍是比较坎坷,听说我对kafka比较了解,希望给予一起分享。刚好最近因为疫情,实际相对于平常稍宽松,也想借此写点东西,一来作为自己的总结,二来可以给予有需要的朋友一些引导,针对此文期望对各位遇到问题的朋友有一定的帮助。
1. 安装JDK ,具体版本可以根据项目实际情况选择肆让,目前使用最多的为jdk8
2. 安装Zookeeper ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.8
3. 安装Kafka ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.1
4. 安装Kafka Manage (非必要:安装主要是了对kafka项目操作提供图形化界面操作),具体版本可以根据项目实际情况选择,本项目使用的是1.3.3.7
parent
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-parent/artifactId
version2.3.12.RELEASE/version
relativePath/ !-- lookup parent from repository --
/parent
dependencies
!--springboot web依赖 --
dependency
groupIdorg.springframework.boot/groupId
artifactIdspring-boot-starter-web/artifactId
/dependency
!--kafka依赖 --
dependency
groupIdorg.springframework.kafka/groupId
artifactIdspring-kafka/artifactId
/dependency
/dependencies
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer: #生产者
# 发生错误后,消息重发的次数。重试1次,此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能时带有顺序写入的数据打乱顺序
#比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)
retries: 1
#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可衡银以使用的内存大小,按照字节数计算。模式时16k
batch-size: 16384 #16k
# 设置生产者内存缓冲区的大小
buffer-memory: 33554432
acks: 1
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
#group-id: default-group
# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效
auto-commit-interval: 1S
enable-auto-commit: false
# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
auto-offset-reset: earliest
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringSerializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringSerializer
listener:
# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# RECORD
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
# BATCH
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
# TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
# COUNT
# TIME | COUNT 有一个条件满足时提交
# COUNT_TIME
# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
# MANUAL
# 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
# MANUAL_IMMEDIATE
ack-mode: manual_immediate
# 在侦听器容器中运行的线程数
concurrency: 5
bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
-- 127.0.0.1:2181 zookeeper 服务器地址
-- replication-factor partitions 副本数量
--partitions partition数量
点击【Cluster】【Add Cluster】打开如下添加集群的配置界面:
输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.0)
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import java.util.UUID;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息生产端,为实践业务提供向kafka block发现消息的API
*/
@Component
@Slf4j
public class QueueProducer {
@Autowired
private KafkaTemplatekafkaTemplate;
public void sendQueue(String topic,Object msgContent) {
String obj2String =JSON.toJSONString(msgContent);
log.info("kafka service 准备发送消息为:{}",obj2String);
//发送消息
ListenableFuturefuture =kafkaTemplate.send(topic,UUID.randomUUID().toString(),obj2String);
future.addCallback(new ListenableFutureCallback() {
//消息发送成功
@Override
public void onSuccess(SendResult stringObjectSendResult) {
log.info("[kafka service-生产成功]topic:{},结果{}",topic, stringObjectSendResult.toString());
}
//消息发送失败
@Override
public void onFailure(Throwable throwable) {
//发送失败的处理,本处只是记录了错误日志,可结合实际业务做处理
log.info("[kafka service-生产失败]topic:{},失败原因{}",topic, throwable.getMessage());
}
});
}
}
package com.charlie.cloudconsumer.service.impl.kafka;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.stereotype.Component;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : 消费端实际的业务处理对象
*/
@Component //添加此注解的原因是因为消费端在项目启动时就会初始化,消费端需要用到此类,故也让此类在项目启动时进行注册
public class QueueDataProcess {
public boolean doExec(Object obj) {
// todu 具体的业务逻辑
if (ObjectUtils.isNotEmpty(obj)) {
return true;
}else {
return false;
}
}
}
package com.charlie.cloudconsumer.service.impl.kafka;
import com.charlie.cloudconsumer.common.utils.JSON;
import com.charlie.cloudconsumer.model.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.Optional;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息消费端,负责消费特定topic消息
*/
@Component
@Slf4j
@SuppressWarnings("all")
public class QueueConsumer {
@Autowired
private QueueDataProcess queueDataProcess;
/**
*
*/
@KafkaListener(topics ="test", groupId ="consumer")
public void doConsumer(ConsumerRecord record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {
Optional message =Optional.ofNullable(record.value());
if (message.isPresent()) {
try {
Object msg =message.get();
log.info("[kafka-消费] doConsumer 消费了: Topic:" + topic +",Message:" +msg);
boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(),Order.class));
if (res) {
ack.acknowledge();
}
}catch (Exception ex) {
log.error("[kafka-消费异常] doConsumer Error {} ",ExceptionUtils.getFullStackTrace(ex));
}
}
}
}
package com.charlie.cloudconsumer.controller;
import com.alibaba.fastjson.JSON;
import com.charlie.cloudconsumer.common.utils.AjaxResult;
import com.charlie.cloudconsumer.common.utils.BuildResponseUtils;
import com.charlie.cloudconsumer.model.Order;
import com.charlie.cloudconsumer.service.impl.kafka.QueueProducer;
import org.apache.commons.lang3.ObjectUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
/**
* @Author: charlie
* @CreateTime: 2022/4/9
* @Description : kafka消息发送控制器,负责接受用户的发送的队列消息
*/
@RestController
@RequestMapping(value ="/kafka",produces =MediaType.APPLICATION_JSON_VALUE)
public class KafkaController {
@Autowired
private QueueProducer queueProducer;
@RequestMapping(value = "/send",method = RequestMethod.POST)
public AjaxResult? sendMsg(@RequestBody Order order) {
AjaxResult? ajaxResult= null;
if (ObjectUtils.isNotEmpty(order)) {
queueProducer.sendQueue("test",order);
ajaxResult = BuildResponseUtils.success(0,"发送消息:"+ JSON.toJSONString(order) + "成功!");
} else {
ajaxResult = BuildResponseUtils.success(1,"发送消息:"+ JSON.toJSONString(order) + "失败!");
}
return ajaxResult;
}
}
SpringBoot整合kafka
此处简单记录一下 SpringBoot 和 Kafka 的凯枣整合。
1、 spring.kafka.consumer.enable-auto-commit 修改成 false
2、 spring.kafka.listener.ack-mode 修改成
|- manual : 表示手动提交,但是测试下来发现是批量提交
|- manual_immediate : 表示手动提交,当调用 Acknowledgment#acknowledge 之后立马提交。
1、消费的发送使毕闷用 KafkaTemplate 。
2、根据发盯数拆送的结果知道,消息发送成功还是失败。
KafkaListener :
topic : 表示需要监听的队列名称
groupId : 表示消费者组的id
1、
SpringBoot集成Kafka,实现简单的收发消息
在kafka的 config 目录下找到 server.properties 配置文件
把 listeners 和蚂让 advertised.listeners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092
KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。
send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码
通过 KafkaTemplate 模板类发送数据。
kafkaTemplate.send(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics
bootstrap-servers :kafka服务器地址(可以多个)
consumer.group-id :指定一个默认的组名
不指定的话会报
1. earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费
2. latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据
3. none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常
这个属性也是必须配置的,不然也是会报错的
在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输
consumer.key-deserializer 和 consumer.value-deserializer 是消费者 key/value 反序列化
producer.key-deserializer 和 producer.value-deserializer 是生产者 key/value 序列化
StringDeserializer 是内置的字符串反序列化历物亏方式
StringSerializer 是内置的字符串序列化方式
在 org.apache.kafka.common.serialization 源码包中还提供了多种类型的序列化和反序列化方式
要自定义序列化肢神方式,需要实现接口 Serializer
要自定义反序列化方式,需要实现接口 Deserializer
详细可以参考
这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息
访问 ,就可以看到控制台打印消息了
[img]关于springboot集成kafka和springboot集成kafka二的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。