我来分步骤详细讲解下“spring 整合kafka监听消费的配置过程”的攻略。
引入Kafka依赖
在 pom.xml
中引入Kafka依赖,常用的包括 spring-kafka
、kafka-clients
等,具体如下:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.2</version>
</dependency>
配置Kafka消费者监听
在 application.yml
中添加Kafka的配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: gp-order
auto-offset-reset: earliest
max-poll-records: 100
enable-auto-commit: true
其中:
- bootstrap-servers
:Kafka集群地址,可以配置多个
- group-id
:消费者组id,同一组内消费者共享同一份消息
- auto-offset-reset
:消费者在没有初始偏移量或者当前偏移量不存在的情况下,该如何处理,earliest
为从最早的偏移量开始消费,latest
为只从最新的消息开始消费
- max-poll-records
:每次拉取消息的数量,默认是 500
- enable-auto-commit
:是否自动提交偏移量
编写消费者监听器
消费者监听器监听到kafka消息后需要完成具体的消费逻辑,代码示例如下:
@Component
public class OrderListener {
@KafkaListener(topics = "order")
public void consume(ConsumerRecord<String, String> record) {
// 获取kafka消息key和value
String key = record.key();
String value = record.value();
// do something...
}
}
其中:
- @KafkaListener
:注解用于标识这是一个消费者监听器,监听 order
主题下的消息
- ConsumerRecord
:Kafka消息对象,包含消息key、value、分区信息等
示例一:监听字符串消息
以下示例展示如何实现监听字符串消息,代码示例:
@Component
public class StringListener {
@KafkaListener(topics = "string-message")
public void consume(String message) {
System.out.println("接收到字符串消息:" + message);
// do something...
}
}
其中:
- @KafkaListener
:注解用于标识这是一个消费者监听器,监听 string-message
主题下的消息
- String
:监听方法的参数类型,表示接收到的是字符串消息
示例二:监听Java对象消息
以下示例展示如何实现监听Java对象消息,代码示例:
@Component
public class OrderListener {
@KafkaListener(topics = "order")
public void consume(OrderDto order) {
System.out.println("接收到订单消息:" + order);
// do something...
}
}
其中:
- @KafkaListener
:注解用于标识这是一个消费者监听器,监听 order
主题下的消息
- OrderDto
:监听方法的参数类型,表示接收到的是OrderDto类型对象消息
以上就是“spring 整合kafka监听消费的配置过程”的完整攻略和两个示例。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring 整合kafka监听消费的配置过程 - Python技术站