# Spring Boot 配置 Kafka## 简介随着分布式系统和微服务架构的普及,消息中间件成为现代应用开发中的重要组成部分。Apache Kafka 是一个高性能、高吞吐量的消息队列系统,广泛应用于日志收集、实时数据处理等领域。Spring Boot 作为目前最流行的 Java 框架之一,与 Kafka 的结合能够快速实现高效的消息驱动架构。本文将详细介绍如何在 Spring Boot 项目中配置和使用 Kafka,包括依赖引入、配置文件设置以及代码实现等步骤。---## 1. 引入 Kafka 依赖在 Spring Boot 项目中,首先需要通过 Maven 或 Gradle 引入 Kafka 相关依赖。### Maven 配置在 `pom.xml` 文件中添加以下依赖:```xml
org.springframework.kafkaspring-kafka2.9.0
```### Gradle 配置在 `build.gradle` 文件中添加如下内容:```gradle
dependencies {implementation 'org.springframework.kafka:spring-kafka:2.9.0'
}
```---## 2. 配置 Kafka 属性在 Spring Boot 中,Kafka 的配置通常通过 `application.properties` 或 `application.yml` 文件完成。### application.properties 示例```properties
# Kafka 集群地址
spring.kafka.bootstrap-servers=localhost:9092# 生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer# 消费者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
```### application.yml 示例```yaml
spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
```---## 3. 编写 Kafka 生产者生产者负责向 Kafka 主题发送消息。### 创建 Kafka 配置类```java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory producerFactory() {Map configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
```### 使用 KafkaTemplate 发送消息```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaMessageProducer {private static final String TOPIC = "test-topic";@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);System.out.println("Sent message: " + message);}
}
```---## 4. 编写 Kafka 消费者消费者负责从 Kafka 主题接收消息。### 创建 Kafka 配置类```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory consumerFactory() {Map props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}
```### 接收并处理消息```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaMessageConsumer {@KafkaListener(topics = "test-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}
```---## 5. 测试 Kafka 功能启动 Kafka 服务器后,在 Spring Boot 应用中注入 `KafkaMessageProducer` 和 `KafkaMessageConsumer`,分别调用 `sendMessage` 方法发送消息,并观察控制台输出是否正确接收到消息。---## 总结通过本文的学习,您已经掌握了如何在 Spring Boot 项目中配置和使用 Kafka。从依赖引入到配置文件编写,再到生产者和消费者的代码实现,整个流程清晰易懂。Kafka 的高效性和灵活性使其成为构建分布式系统的理想选择,而 Spring Boot 则让其集成变得更加简单快捷。希望本文能为您的实际开发提供帮助!
Spring Boot 配置 Kafka
简介随着分布式系统和微服务架构的普及,消息中间件成为现代应用开发中的重要组成部分。Apache Kafka 是一个高性能、高吞吐量的消息队列系统,广泛应用于日志收集、实时数据处理等领域。Spring Boot 作为目前最流行的 Java 框架之一,与 Kafka 的结合能够快速实现高效的消息驱动架构。本文将详细介绍如何在 Spring Boot 项目中配置和使用 Kafka,包括依赖引入、配置文件设置以及代码实现等步骤。---
1. 引入 Kafka 依赖在 Spring Boot 项目中,首先需要通过 Maven 或 Gradle 引入 Kafka 相关依赖。
Maven 配置在 `pom.xml` 文件中添加以下依赖:```xml
org.springframework.kafkaspring-kafka2.9.0
```
Gradle 配置在 `build.gradle` 文件中添加如下内容:```gradle
dependencies {implementation 'org.springframework.kafka:spring-kafka:2.9.0'
}
```---
2. 配置 Kafka 属性在 Spring Boot 中,Kafka 的配置通常通过 `application.properties` 或 `application.yml` 文件完成。
application.properties 示例```properties
Kafka 集群地址
spring.kafka.bootstrap-servers=localhost:9092
生产者配置
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
消费者配置
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
```
application.yml 示例```yaml
spring:kafka:bootstrap-servers: localhost:9092producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializer
```---
3. 编写 Kafka 生产者生产者负责向 Kafka 主题发送消息。
创建 Kafka 配置类```java
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.util.HashMap;
import java.util.Map;@Configuration
public class KafkaProducerConfig {@Beanpublic ProducerFactory producerFactory() {Map configProps = new HashMap<>();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);return new DefaultKafkaProducerFactory<>(configProps);}@Beanpublic KafkaTemplate kafkaTemplate() {return new KafkaTemplate<>(producerFactory());}
}
```
使用 KafkaTemplate 发送消息```java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;@Service
public class KafkaMessageProducer {private static final String TOPIC = "test-topic";@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String message) {kafkaTemplate.send(TOPIC, message);System.out.println("Sent message: " + message);}
}
```---
4. 编写 Kafka 消费者消费者负责从 Kafka 主题接收消息。
创建 Kafka 配置类```java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;@EnableKafka
@Configuration
public class KafkaConsumerConfig {@Beanpublic ConsumerFactory consumerFactory() {Map props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);return new DefaultKafkaConsumerFactory<>(props);}@Beanpublic ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());return factory;}
}
```
接收并处理消息```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;@Service
public class KafkaMessageConsumer {@KafkaListener(topics = "test-topic", groupId = "my-group")public void listen(String message) {System.out.println("Received message: " + message);}
}
```---
5. 测试 Kafka 功能启动 Kafka 服务器后,在 Spring Boot 应用中注入 `KafkaMessageProducer` 和 `KafkaMessageConsumer`,分别调用 `sendMessage` 方法发送消息,并观察控制台输出是否正确接收到消息。---
总结通过本文的学习,您已经掌握了如何在 Spring Boot 项目中配置和使用 Kafka。从依赖引入到配置文件编写,再到生产者和消费者的代码实现,整个流程清晰易懂。Kafka 的高效性和灵活性使其成为构建分布式系统的理想选择,而 Spring Boot 则让其集成变得更加简单快捷。希望本文能为您的实际开发提供帮助!