Spring Kafka @KafkaListener详解与使用过程
简介
Spring Kafka 为 Kafka 提供了 Producer 和 Consumer 的封装,提供了方便的API让我们在Spring Boot项目中使用Kafka。其中 @KafkaListener 的注解为我们编写 Kafka Consumer 提供便利。
使用步骤
使用 Spring Kafka @KafkaListener 实现 Kafka Consumer 的步骤如下:
- 引入 Maven 依赖
在 pom.xml 文件中添加如下依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.2</version>
</dependency>
- 编写 Kafka Consumer
编写一个 Consumer 类,使用 @KafkaListener 注解标注方法,用于监听特定的 topic 。方法中使用 ConsumerRecord 对象来接收消息。
@Component
public class MyConsumer {
@KafkaListener(topics = "mytopic")
public void onMessage(ConsumerRecord<String, String> record) {
// 处理消息
String message = record.value();
// do something
}
}
- 配置 Kafka 消费者
配置 Kafka 消费者属性,根据需要设置一些消费者的参数,比如bootstrap.servers, group.id等等。
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
- 编写Kafka Producer
编写一个 Producer 类,使用 KafkaTemplate 实现消息发送,代码如下:
@Service
public class MyProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public MyProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
- 发送消息
@Service
public class MyService {
private final MyProducer myProducer;
public MyService(MyProducer myProducer) {
this.myProducer = myProducer;
}
public void sendMessage(String topic, String message) {
myProducer.sendMessage(topic, message);
}
}
- 运行应用程序,监听消息
在应用程序启动时,Spring 容器会自动扫描带有 @KafkaListener 注解的方法,并启动相应的消费者线程监听 topic 的消息。消息到达时,onMessage()方法会被回调。
例子
下面简单介绍两个使用 KafkaListener 注解的实例。
实例1
发送消息到指定 topic ,并监听该 topic 的消息
@RestController
@RequestMapping("/api/kafka")
public class KafkaController {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@PostMapping("/send/{topic}")
public String sendMessage(@PathVariable String topic, @RequestParam String message) {
kafkaTemplate.send(topic, message);
return "Message sent successfully to topic: " + topic;
}
}
@Component
public class MyConsumer {
@KafkaListener(topics = "mytopic")
public void onMessage(ConsumerRecord<String, String> record) {
String message = record.value();
System.out.println("接收到的消息:" + message);
}
}
当消息发送成功后,Consumer 的 onMessage 方法会监听到消息,输出包含消息内容的日志。
实例2
监听多个 topic,使用groupId来协同处理消息。
@Component
public class MyConsumer {
@KafkaListener(id = "myGroup", topics = {"topic1", "topic2"})
public void onMessage(ConsumerRecord<String, String> record) {
// 处理消息
String topic = record.topic();
String message = record.value();
System.out.println("接收到 " + topic + " 消息:" + message);
}
}
通过在注解中使用id属性来指定groupId,即可让多个 Consumer 实例共同协作处理消息,实现消息的高可用性。
总结
使用 Spring Kafka 的 @KafkaListener 注解,即可快速编写 Kafka Consumer,实现消息的消费。在实际开发过程中,可根据需求配置 Spring Kafka 的各项参数,以实现对消息的更加细粒度地处理和控制。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring kafka @KafkaListener详解与使用过程 - Python技术站