springkafka(springkafka自动创建topic)

本篇文章给大家谈谈springkafka,以及springkafka自动创建topic对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。

本文目录一览:

spring kafka 参数说明

# kafka

spring.kafka.bootstrap-servers=10.125.70.41:9092,10.125.70.35:9092,10.125.70.36:9092

#client-id

spring.kafka.client-id=group1

生产者参数

# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。

# acks=1 : 只要集群的首领节点收到消息,态悔生产者就会收到一个来自服务器成功响应。

# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

spring.kafka.producer.acks=1

#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。

spring.kafka.producer.batch-size=16384

# 发生错误后,消息重发的次数。

spring.kafka.producer.retries=3

# 设置生枣中产者内存缓冲区的大小。

spring.kafka.producer.buffer-memory=33554432

spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

消费者参数

# 自动提交的时间间隔

spring.kafka.consumer.auto-commit-interval=1000

# offset的消费位置

spring.kafka.consumer.auto-offset-reset=latest

# 是否自动提交

spring.kafka.consumer.enable-auto-commit=false

# 最大拉取间隔时间

spring.kafka.consumer.max.poll.interval.ms =600000

# 会话超时时间

spring.kafka.consumer.session.timeout.ms =10000

spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# 消费组名称

spring.kafka.consumer.groupId=dmsdecision

# 最大拉取条数

spring.kafka.consumer.max-poll-records=30

# 心跳时间

spring.kafka.consumer.heartbeat-interval=3000

# kafka spring.kafka.properties.parsefileContainerFactory_concurrency监听线程数未设置时,本参数生效

spring.kafka.listener.concurrency=30

#MANUAL 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

#MANUAL_IMMEDIATE 手动调用Acknowledgment.acknowledge()后立即提交

#RECORD 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交帆岩正

#BATCH 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

#TIME 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

#COUNT 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

#COUNT_TIME TIME或COUNT 有一个条件满足时提交

# ack_mode为COUNT/COUNT_TIME 时配置

spring.kafka.listener.ack-mode=manual_immediate

# ack_mode为COUNT/COUNT_TIME 时配置

spring.kafka.listener.ack-count=

# ack_mode为/COUNT_TIME 时配置

spring.kafka.listener.ack-time=

# poll拉取数据超时时间

spring.kafka.listener.poll-timeout=

[img]

Springboot集成Kafka

最近陆续收到不少朋友反馈,他们在计划要在springboot项目引入kafka中间件。在网上找过很多资料,但都比较零散,按照其说明进行操作后让项目裂拦局正常跑起来仍是比较坎坷,听说我对kafka比较了解,希望给予一起分享。刚好最近因为疫情,实际相对于平常稍宽松,也想借此写点东西,一来作为自己的总结,二来可以给予有需要的朋友一些引导,针对此文期望对各位遇到问题的朋友有一定的帮助。

1. 安装JDK ,具体版本可以根据项目实际情况选择肆让,目前使用最多的为jdk8

2. 安装Zookeeper ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.8

3. 安装Kafka  ,具体版本可以根据项目实际情况选择,本项目使用的是3.5.1

4. 安装Kafka Manage   (非必要:安装主要是了对kafka项目操作提供图形化界面操作),具体版本可以根据项目实际情况选择,本项目使用的是1.3.3.7

parent

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-parent/artifactId

version2.3.12.RELEASE/version

relativePath/ !-- lookup parent from repository --

/parent

dependencies

!--springboot web依赖 --

dependency

groupIdorg.springframework.boot/groupId

artifactIdspring-boot-starter-web/artifactId

/dependency

!--kafka依赖 --

dependency

groupIdorg.springframework.kafka/groupId

artifactIdspring-kafka/artifactId

/dependency

/dependencies

spring:

  kafka:

    bootstrap-servers: 127.0.0.1:9092

    producer: #生产者

      # 发生错误后,消息重发的次数。重试1次,此值需要结合业务场景,重试与否各有千秋(重试,好处:尽可能的确保生产者写入block成功;坏处:有可能时带有顺序写入的数据打乱顺序

      #比如:依次写入数据 1/2/3,但写入1时因网络异常,进行了重写,结果落到block的数据成了2/3/1)

      retries: 1

      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可衡银以使用的内存大小,按照字节数计算。模式时16k

      batch-size: 16384 #16k

      # 设置生产者内存缓冲区的大小

      buffer-memory: 33554432

      acks: 1

      # 键的序列化方式

      key-serializer: org.apache.kafka.common.serialization.StringSerializer

      # 值的序列化方式

      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:

      #group-id: default-group

      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D.此属性只有在enable-auto-commit:true时生效

      auto-commit-interval: 1S

      enable-auto-commit: false

      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:

      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)

      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录

      auto-offset-reset: earliest

      # 键的反序列化方式

      key-deserializer: org.apache.kafka.common.serialization.StringSerializer

      # 值的反序列化方式

      value-deserializer: org.apache.kafka.common.serialization.StringSerializer

    listener:

      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交

      # RECORD

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交

      # BATCH

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交

      # TIME

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交

      # COUNT

      # TIME | COUNT 有一个条件满足时提交

      # COUNT_TIME

      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交

      # MANUAL

      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种

      # MANUAL_IMMEDIATE

      ack-mode: manual_immediate

      # 在侦听器容器中运行的线程数

      concurrency: 5

bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test

 -- 127.0.0.1:2181  zookeeper 服务器地址

--  replication-factor partitions 副本数量

--partitions partition数量

点击【Cluster】【Add Cluster】打开如下添加集群的配置界面:

输入集群的名字(如Kafka-Cluster-1)和 Zookeeper 服务器地址(如localhost:2181),选择最接近的Kafka版本(如0.10.1.0)

package com.charlie.cloudconsumer.service.impl.kafka;

import com.charlie.cloudconsumer.common.utils.JSON;

import lombok.extern.slf4j.Slf4j;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.core.KafkaTemplate;

import org.springframework.kafka.support.SendResult;

import org.springframework.util.concurrent.ListenableFuture;

import org.springframework.util.concurrent.ListenableFutureCallback;

import java.util.UUID;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息生产端,为实践业务提供向kafka block发现消息的API

*/

@Component

@Slf4j

public class QueueProducer {

@Autowired

    private KafkaTemplatekafkaTemplate;

public void sendQueue(String topic,Object msgContent) {

String obj2String =JSON.toJSONString(msgContent);

log.info("kafka service 准备发送消息为:{}",obj2String);

//发送消息

        ListenableFuturefuture =kafkaTemplate.send(topic,UUID.randomUUID().toString(),obj2String);

future.addCallback(new ListenableFutureCallback() {

//消息发送成功

            @Override

            public void onSuccess(SendResult stringObjectSendResult) {

log.info("[kafka service-生产成功]topic:{},结果{}",topic, stringObjectSendResult.toString());

}

//消息发送失败

            @Override

            public void onFailure(Throwable throwable) {

//发送失败的处理,本处只是记录了错误日志,可结合实际业务做处理

                log.info("[kafka service-生产失败]topic:{},失败原因{}",topic, throwable.getMessage());

}

});

}

}

package com.charlie.cloudconsumer.service.impl.kafka;

import org.apache.commons.lang3.ObjectUtils;

import org.springframework.stereotype.Component;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : 消费端实际的业务处理对象

*/

@Component //添加此注解的原因是因为消费端在项目启动时就会初始化,消费端需要用到此类,故也让此类在项目启动时进行注册

public class QueueDataProcess {

public boolean doExec(Object obj) {

// todu 具体的业务逻辑

        if (ObjectUtils.isNotEmpty(obj)) {

return true;

}else {

return false;

}

}

}

package com.charlie.cloudconsumer.service.impl.kafka;

import com.charlie.cloudconsumer.common.utils.JSON;

import com.charlie.cloudconsumer.model.Order;

import lombok.extern.slf4j.Slf4j;

import org.apache.commons.lang.exception.ExceptionUtils;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.kafka.annotation.KafkaListener;

import org.springframework.kafka.support.Acknowledgment;

import org.springframework.kafka.support.KafkaHeaders;

import org.springframework.messaging.handler.annotation.Header;

import org.springframework.stereotype.Component;

import java.util.Optional;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息消费端,负责消费特定topic消息

*/

@Component

@Slf4j

@SuppressWarnings("all")

public class QueueConsumer {

@Autowired

    private QueueDataProcess queueDataProcess;

/**

*

*/

    @KafkaListener(topics ="test", groupId ="consumer")

public void doConsumer(ConsumerRecord record,Acknowledgment ack,@Header(KafkaHeaders.RECEIVED_TOPIC)String topic) {

Optional message =Optional.ofNullable(record.value());

if (message.isPresent()) {

try {

Object msg =message.get();

log.info("[kafka-消费] doConsumer 消费了: Topic:" + topic +",Message:" +msg);

boolean res =queueDataProcess.doExec(JSON.parseObject(msg.toString(),Order.class));

if (res) {

ack.acknowledge();

}

}catch (Exception ex) {

log.error("[kafka-消费异常] doConsumer Error {} ",ExceptionUtils.getFullStackTrace(ex));

}

}

}

}

package com.charlie.cloudconsumer.controller;

import com.alibaba.fastjson.JSON;

import com.charlie.cloudconsumer.common.utils.AjaxResult;

import com.charlie.cloudconsumer.common.utils.BuildResponseUtils;

import com.charlie.cloudconsumer.model.Order;

import com.charlie.cloudconsumer.service.impl.kafka.QueueProducer;

import org.apache.commons.lang3.ObjectUtils;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestBody;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.RestController;

/**

* @Author: charlie

* @CreateTime: 2022/4/9

* @Description : kafka消息发送控制器,负责接受用户的发送的队列消息

*/

@RestController

@RequestMapping(value ="/kafka",produces =MediaType.APPLICATION_JSON_VALUE)

public class KafkaController {

    @Autowired

    private QueueProducer queueProducer;

    @RequestMapping(value = "/send",method = RequestMethod.POST)

    public  AjaxResult? sendMsg(@RequestBody Order order) {

        AjaxResult? ajaxResult= null;

        if (ObjectUtils.isNotEmpty(order)) {

          queueProducer.sendQueue("test",order);

            ajaxResult = BuildResponseUtils.success(0,"发送消息:"+ JSON.toJSONString(order) + "成功!");

        } else {

            ajaxResult = BuildResponseUtils.success(1,"发送消息:"+ JSON.toJSONString(order) + "失败!");

        }

        return ajaxResult;

    }

}

SpringBoot集成Kafka,实现简单的收发消息

在kafka的 config 目录下找到 server.properties 配置文件

把 listeners 和蚂让 advertised.listeners 两处配置的注释去掉,可以根据需要配置连接的服务器 外网IP 和 端口号 ,我这里演示选择的是本地 localhost 和默认端口 9092

KafkaTemplate 这个类包装了个生产者 Producer ,来提供方便的发送数据到 kafka 的主题 topic 里面。

send() 方法的源码, KafkaTemplate 类中还重载了很多 send() 方法,有需要可以看看源码

通过 KafkaTemplate 模板类发送数据。

kafkaTemplate.send(String topic, K key, V data) ,第一个入参是主题,第二个入参是发送的对象,第三个入参是发送的数据。通过 @KafkaListener 注解配置用户监听 topics

bootstrap-servers :kafka服务器地址(可以多个)

consumer.group-id :指定一个默认的组名

不指定的话会报

1. earliest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,从头开始消费

2. latest :当各分区下有已提交的 offset 时,从提交的 offset 开始消费;无提交的 offset 时,消费新产生的该分区下的数据

3. none : topic 各分区都存在已提交的 offset 时,从 offset 后开始消费;只要有一个分区不存在已提交的 offset ,则抛出异常

这个属性也是必须配置的,不然也是会报错的

在使用Kafka发送接收消息时,生产者 producer 端需要序列化,消费者 consumer 端需要反序列化,由于网络传输过来的是 byte[] ,只有反序列化后才能得到生产者发送的真实的消息内容。这样消息才能进行网络传输

consumer.key-deserializer 和 consumer.value-deserializer 是消费者 key/value 反序列化

producer.key-deserializer 和 producer.value-deserializer 是生产者 key/value 序列化

StringDeserializer 是内置的字符串反序列化历物亏方式

StringSerializer 是内置的字符串序列化方式

在 org.apache.kafka.common.serialization 源码包中还提供了多种类型的序列化和反序列化方式

要自定义序列化肢神方式,需要实现接口 Serializer

要自定义反序列化方式,需要实现接口 Deserializer

详细可以参考

这是 Kafka 的消费者 Consumer 的配置信息,每个消费者都会输出该配置信息

访问 ,就可以看到控制台打印消息了

Spring自带Kafka消息异常处理

方法onMessage会执行注解@KafkaListener指定的方法

报异常会catch到,继续下一条消息,如果设置的是自动提交offset,则kafka认升神为此消仔森息消费成功

异吵戚亏常处理的实现类,打印日志,仅此而已

springboot整合kafka实现消息推送

本篇文章主要介绍的是springboot整合清首kafka。

1.使用docker安装kafka,移步

创建一个名为springboot-kafka-common的微服务,打包方式为jar,存放一些公共配置和公共类,如util等

1.配置pom文件

pom文件中以父工程作为父依赖,就不需要额外引入依赖了

2.新建一个user实体类

3.创建application-common.yml配置文件,主要添加闭或kafka的公共配置

1.pom文件配置

2.application.yml配置文件,配置端口,设置微服务名称,引入公共服务模块中的application-common.yml

3.controller层

创建UserController

4.service层

创建UserService

创建UserServiceImpl

5.创建启动类

1.pom文件

2.创建yml配置文件

3.创建consumer消费者类

4.启动类

启动producer和consumer两个服务模块

访问producer微服务中的接口

会发现consumer微服务中的控制台打印了producer中创建并推送过来的的user实体轿正伍

本文GitHub源码:

CSDN:

博客园:

个人博客: `

SpringBoot整合kafka

此处简单记录一下 SpringBoot 和 Kafka 的凯枣整合。

1、 spring.kafka.consumer.enable-auto-commit 修改成 false

2、 spring.kafka.listener.ack-mode 修改成

            |- manual : 表示手动提交,但是测试下来发现是批量提交

            |- manual_immediate : 表示手动提交,当调用 Acknowledgment#acknowledge 之后立马提交。

1、消费的发送使毕闷用 KafkaTemplate 。

2、根据发盯数拆送的结果知道,消息发送成功还是失败。

KafkaListener :

      topic : 表示需要监听的队列名称

      groupId : 表示消费者组的id

1、

关于springkafka和springkafka自动创建topic的介绍到此就结束了,不知道你从中找到你需要的信息了吗 ?如果你还想了解更多这方面的信息,记得收藏关注本站。

标签列表