springboot集成kafka(springboot集成kafka集群)

SpringBoot 集成 Kafka

简介

SpringBoot 是一个流行的 Java 框架,用于简化应用程序的开发和配置。Kafka 是一个分布式流处理平台,用于实时处理大数据。本文将指导您如何将 Kafka 集成到 SpringBoot 应用程序中。

配置 Kafka 依赖项

在您的 SpringBoot 项目中,添加以下依赖项到 pom.xml 文件中:```xml org.springframework.bootspring-boot-starter-kafka ```

创建 KafkaProducer

要向 Kafka 主题发送消息,需要创建一个 KafkaProducer。在 SpringBoot 中,可以通过 @KafkaListener 注解自动创建 KafkaProducer。```java @KafkaListener(topics = "test") public void receive(String message) {// 处理传入的消息 } ```

创建 KafkaConsumer

要从 Kafka 主题接收消息,需要创建一个 KafkaConsumer。在 SpringBoot 中,可以通过 @KafkaListener 注解自动创建 KafkaConsumer。```java @KafkaListener(topics = "test") public void consume(String message) {// 处理传入的消息 } ```

配置 Kafka 属性

您可以通过 application.properties 或 application.yml 文件配置 Kafka 属性。以下是几个常用的属性:

bootstrap.servers:

Kafka 集群的引导服务器地址列表。

key.serializer:

用于对消息键进行序列化的序列化器类。

value.serializer:

用于对消息值进行序列化的序列化器类。

示例

让我们创建一个简单的 SpringBoot 应用程序,向名为 "test" 的 Kafka 主题发送和接收消息:```java // Producer @SpringBootApplication public class ProducerApplication {public static void main(String[] args) { SpringApplication.run(ProducerApplication.class, args); } }@RestController public class ProducerController {@Autowiredprivate KafkaTemplate kafkaTemplate;@PostMapping("/send")public void send(@RequestBody String message) {kafkaTemplate.send("test", message);} }// Consumer @SpringBootApplication public class ConsumerApplication {public static void main(String[] args) { SpringApplication.run(ConsumerApplication.class, args); } }@KafkaListener(topics = "test") public class ConsumerListener {@KafkaHandlerpublic void receive(String message) {System.out.println(message);} } ```

运行应用程序

您可以在两个单独的终端窗口中运行 producer 和 consumer 应用程序:```bash mvn spring-boot:run -Dspring-boot.run.profiles=producermvn spring-boot:run -Dspring-boot.run.profiles=consumer ```现在,您可以使用 producer 应用程序发送消息,consumer 应用程序将接收并打印这些消息。

标签列表