下面我来详细讲解“Springboot集成Kafka进行批量消费及踩坑点”的完整攻略。
一、前言
Kafka是一款分布式消息队列系统,由Apache在2011年引入,其主要包括了生产者、消费者等API,用于实现消息的发送和接收等操作。而Springboot则是目前流行的一种开发框架,它可以简化Java应用的开发过程。本文将探讨如何在Springboot中集成Kafka进行批量消费,同时也会分享一些踩坑的经验。
二、集成Kafka
1. 引入Kafka依赖
在pom.xml
中引入Kafka依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
2. 添加Kafka配置
在application.properties
中添加如下Kafka配置:
#Kafka相关配置
kafka.producer.bootstrap-servers=localhost:9092
kafka.consumer.bootstrap-servers=localhost:9092
kafka.consumer.groupId=group-1
kafka.consumer.enable.auto.commit=true
kafka.consumer.auto.commit.interval.ms=1000
kafka.consumer.session.timeout.ms=30000
kafka.consumer.max.poll.records=10
3. 创建Kafka生产者
在KafkaProducer
中,我们要注入KafkaTemplate:
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String key, String message) throws Exception {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("发送消息成功," + result.getProducerRecord().topic() + "-" +
result.getProducerRecord().key() + ":" + result.getProducerRecord().value());
}
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败,原因:" + throwable.getMessage());
}
});
}
}
4. 创建Kafka消费者
在KafkaConsumer
中,我们要通过@KafkaListener
注解指定消费的topic和group,同时指定批量拉取消息的数量:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "test-topic", groupId = "group-1", containerFactory = "kafkaListenerContainerFactory")
public void listen(List<ConsumerRecord<String, String>> messages) {
System.out.println("批量消费一次,消息数量:" + messages.size());
for (ConsumerRecord<String, String> message : messages) {
System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(3000);
factory.setBatchListener(true); // 指定批量消费
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return properties;
}
}
在上述代码中,我们将factory.setBatchListener(true)
设置为true
,表示启用批量消费。
三、示例
1. 生产消息
在Controller
中编写生产消息的接口:
@RestController
public class TestController {
@Autowired
private KafkaProducer kafkaProducer;
@GetMapping("/send")
public String send() throws Exception {
String message = "Hello, Kafka!";
kafkaProducer.sendMessage("test-topic", "key-" + System.currentTimeMillis(), message);
return "success";
}
}
2. 消费消息
启动应用后,访问http://localhost:8080/send
接口生产一条消息,观察控制台输出。
四、踩坑点
1. Group ID重复
Kafka中的Group ID是用于标识消费者组的,同一个Group ID只允许存在一个消费者组。如果在多个消费者组中使用了同一个Group ID会导致消费者无法消费消息。因此,Group ID需要在多个应用中保持唯一。
2. Auto-commit被动触发
Kafka中的消费者组有自动提交和手动提交两种方式。在自动提交模式下,Kafka会定时自动提交已消费的消息偏移量,但是由于这个过程是异步的,因此可能会存在消费了但是消息偏移量未提交的情况。因此,对于需要保证消息不被重复消费的场景,建议使用手动提交模式。
3. 手动提交消息偏移量
在手动提交模式下,我们需要在消费消息后手动提交消息的偏移量,以避免重复消费。代码示例如下:
@KafkaListener(topics = "test-topic", groupId = "group-1")
public void listen(ConsumerRecord<String, String> message, Acknowledgment acknowledgment) {
System.out.println("消费消息,topic:" + message.topic() + ",key:" + message.key() + ",value:" + message.value());
acknowledgment.acknowledge(); // 手动提交消息偏移量
}
以上就是我关于“Springboot集成Kafka进行批量消费及踩坑点”的详细讲解,希望对你有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Springboot集成Kafka进行批量消费及踩坑点 - Python技术站