## Spring Boot 整合 Kafka 实现消息消费### 简介Apache Kafka 是一个分布式流处理平台,常用于构建高吞吐量、低延迟的消息传递系统。Spring Boot 为 Kafka 提供了强大的支持,可以方便地集成到应用程序中,实现消息的生产和消费。本文将重点介绍如何使用 Spring Boot 消费 Kafka 消息。### 1. 添加依赖在 Spring Boot 项目的 `pom.xml` 文件中添加以下依赖:```xml
org.springframework.kafkaspring-kafka
```### 2. 配置 Kafka 属性在 `application.properties` 或 `application.yml` 文件中配置 Kafka 相关属性,例如:```properties
# Kafka broker 地址
spring.kafka.bootstrap-servers=localhost:9092# 消费者组 ID
spring.kafka.consumer.group-id=my-group# 自动提交偏移量
spring.kafka.consumer.auto-offset-reset=earliest
```- `spring.kafka.bootstrap-servers`:Kafka broker 的地址列表,多个地址用逗号分隔。
- `spring.kafka.consumer.group-id`:消费者组 ID,用于标识一组消费者。
- `spring.kafka.consumer.auto-offset-reset`:当没有初始偏移量或当前偏移量无效时,该如何处理。常用的值包括:- `earliest`:从最早的可用偏移量开始消费。- `latest`:从最新的偏移量开始消费。### 3. 创建 Kafka 监听器使用 `@KafkaListener` 注解创建一个 Kafka 消息监听器:```java
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class KafkaConsumer {@KafkaListener(topics = "my-topic")public void listen(String message) {System.out.println("Received message: " + message);}
}
```- `@KafkaListener` 注解用于指定要监听的主题。
- `topics` 属性指定要监听的主题名称。
- `listen()` 方法是消息处理方法,当有消息到达时会被调用。### 4. 启动应用程序启动 Spring Boot 应用程序,消费者就会开始监听指定的 Kafka 主题。### 5. 高级配置#### 5.1. 手动提交偏移量默认情况下,消费者会自动提交偏移量。如果需要手动控制偏移量提交,可以将 `spring.kafka.consumer.enable-auto-commit` 属性设置为 `false`,并在消息处理方法中使用 `Acknowledgment` 对象手动提交偏移量:```java
@KafkaListener(topics = "my-topic")
public void listen(String message, Acknowledgment acknowledgment) {System.out.println("Received message: " + message);acknowledgment.acknowledge();
}
```#### 5.2. 处理不同类型的消息可以使用 `@KafkaListener` 注解的 `groupId` 属性将消费者分组,每个组都会收到主题的所有消息:```java
@KafkaListener(topics = "my-topic", groupId = "group-a")
public void listenGroupA(String message) {// ...
}@KafkaListener(topics = "my-topic", groupId = "group-b")
public void listenGroupB(String message) {// ...
}
```### 总结本文介绍了如何使用 Spring Boot 消费 Kafka 消息,包括添加依赖、配置 Kafka 属性、创建 Kafka 监听器以及一些高级配置。Spring Boot 提供了简单易用的 API,可以快速构建可靠的 Kafka 消息消费应用。