下面是使用Spring Boot整合Kafka,延迟启动消费者的详细攻略,由以下步骤组成:
- 添加Kafka依赖
在Spring Boot项目中,需要在pom.xml
文件中添加Kafka的依赖,可以通过以下方式添加:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.1</version>
</dependency>
- 创建Kafka配置类
接下来我们需要创建一个配置类,用于配置Kafka:
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
new StringDeserializer(),
new StringDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); //设置批量消费
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
在配置类中,我们将消费者的相关配置属性注入,同时定义一个Map
对象并在consumerConfigs
方法中进行初始化。然后定义一个ConsumerFactory
对象来构建DefaultKafkaConsumerFactory
对象,传入Kafka相关的属性和StringDeserializer
实例。最后在kafkaListenerContainerFactory
方法中设置了ConcurrentKafkaListenerContainerFactory
对象来实现我们的批量消费或其他消费需要。
- 创建Kafka消费者
现在我们需要创建一个Kafka消费者,用于消费Kafka中的消息:
@Service
public class KafkaConsumerService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void listen(ConsumerRecord<String, String> record) {
LOGGER.info("收到消息:{}", record.value());
}
}
在这里,我们使用@KafkaListener
注解来监听spring.kafka.consumer.topic
主题,当有新的消息到达时会触发listen
方法,其中ConsumerRecord
对象包含了消息的元数据和消息内容。
- 延迟启动Kafka消费者
由于Kafka消费者默认启动时就会开始消费,如果我们想要延迟消费的开始时间,需要按照以下步骤进行配置:
- 将
application.yml
文件中的spring.kafka.listener.type
属性设置为batch
。 - 在
KafkaConsumerService
类中添加@ConditionalOnProperty
注解来判断是否需要延迟启动消费者。 - 在
KafkaConsumerService
类中定义一个计数器,用于延迟消费者的启动,直到计数器减为0时启动消费。我们可以使用CountDownLatch
来实现这个功能。
@Service
public class KafkaConsumerService {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);
/**
* 计数器,用于延迟消费者启动
*/
private CountDownLatch latch = new CountDownLatch(1);
/**
* 添加一个带有@ConditionalOnProperty注解的构造方法
* 用于判断application.yml配置中是否需要延迟消费者启动
*/
public KafkaConsumerService(@Value("${spring.kafka.consumer.delayed-start.enabled:false}") boolean delayedStartEnabled) {
if (delayedStartEnabled) {
new Thread(() -> {
try {
LOGGER.warn("消费者延迟启动中...");
Thread.sleep(5000); //延迟5秒钟
} catch (InterruptedException e) {
LOGGER.error("线程休眠异常:", e);
}
latch.countDown();
LOGGER.warn("消费者启动...");
}).start();
} else {
latch.countDown();
}
}
/**
* 监听kafka消息,使用@KafkaListener注解即可
*/
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void listen(ConsumerRecord<String, String> record) {
try {
//等待计数器归零,即延迟秒数到了才真正开始消费
latch.await();
LOGGER.info("收到消息:{}", record.value());
} catch (InterruptedException e) {
LOGGER.error("线程休眠异常:", e);
}
}
}
通过以上代码,我们创建了一个计数器latch
,用于延迟消费者的启动,并判断了application.yml
文件中是否需要延迟消费的开启。如果需要,则使用CountDownLatch
进行计数,在延迟结束后将计数器减为0,从而触发消费者的启动。
至此,我们就成功地使用Spring Boot整合Kafka,并且实现了延迟启动消费者的功能。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:使用spring boot 整合kafka,延迟启动消费者 - Python技术站