下面将详细讲解如何在Spring Boot 项目中集成 Kafka 消费者,并实现手动启动、停止操作。
步骤一:添加Kafka依赖
在 maven 的 pom 文件中添加 Kafka 相关依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
步骤二:创建Kafka消费配置类
在 Spring Boot 项目中创建 KafkaConsumerConfig 类,作为 Kafka 消费者的配置类,通过注解 @Configuration 声明该类是配置类:
@Configuration
public class KafkaConsumerConfig {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
ConsumerFactory
类是 Kafka 消费者的工厂类,创建出指定类型的消费者对象;ConcurrentKafkaListenerContainerFactory
类则是用来创建监听器容器的,其中包含了许多配置项,比如 ConsumerFactory
、并发数等。
步骤三:创建Kafka消费监听器
在 Spring Boot 项目中创建 KafkaConsumerListener 类,用来监听和处理 Kafka 消息:
@Service
public class KafkaConsumerListener {
private final Logger logger = LoggerFactory.getLogger(getClass());
@KafkaListener(topics = {"test-topic"})
public void onMessage(String message) {
logger.info("Received message: {}", message);
}
}
通过 @KafkaListener
注解指定监听的 Topic,并指定处理接收到消息的方法 onMessage
。
步骤四:手动控制消费者启动和停止
为了可以手动控制消费者的启动和停止,需要创建一个 bean 对象来保存 ConcurrentMessageListenerContainer
类型对象,该对象可以实现“启动、停止、暂停和恢复”等操作。
@Component
public class KafkaConsumerManager {
@Autowired
private KafkaConsumerListener listener;
@Autowired
private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;
private ConcurrentMessageListenerContainer<String, String> container;
private final Logger logger = LoggerFactory.getLogger(getClass());
/**
* 启动
*/
public void start() {
container = kafkaListenerContainerFactory.createContainer("test-topic");
container.setupMessageListener(listener);
container.start();
logger.info("Kafka consumer started");
}
/**
* 停止
*/
public void stop() {
if (container != null) {
container.stop();
container = null;
logger.info("Kafka consumer stopped");
}
}
}
在上述 KafkaConsumerManager 类中,需要注入 KafkaConsumerListener 对象和 Kafka 监听器容器工厂对象,以及创建容器对象的方法,容器对象需要实现监听器容器的基本操作,包括启动、停止、暂定和恢复等。
示例一:手动启动和停止消费者
在 Spring Boot 项目中,我们可以直接注入 KafkaConsumerManager bean,然后调用它的 start
和 stop
方法启动和停止消费者:
@RestController
public class KafkaController {
@Autowired
private KafkaConsumerManager consumerManager;
@GetMapping("/start")
public String start() {
// 启动消费者
consumerManager.start();
return "Kafka consumer started ";
}
@GetMapping("/stop")
public String stop() {
// 停止消费者
consumerManager.stop();
return "Kafka consumer stopped";
}
}
示例二:通过启动命令行参数启动和停止消费者
我们也可以通过启动命令行参数来控制 Kafka 消费者的启动和停止。先在 application.properties 文件中添加如下两个属性:
kafka.consumer.enabled=true
kafka.consumer.start=false
然后在 Spring Boot 项目启动类中接收并解析启动命令参数,根据不同参数值控制消费者的启动和停止:
@SpringBootApplication
public class KafkaApplication implements ApplicationRunner {
@Autowired
private KafkaConsumerManager consumerManager;
public static void main(String[] args) {
SpringApplication.run(KafkaApplication.class, args);
}
@Override
public void run(ApplicationArguments args) throws Exception {
boolean kafkaConsumerEnabled = Boolean.parseBoolean(
args.getOptionValues("kafka.consumer.enabled").get(0));
boolean kafkaConsumerStart = Boolean.parseBoolean(
args.getOptionValues("kafka.consumer.start").get(0));
if (kafkaConsumerEnabled && kafkaConsumerStart) {
consumerManager.start();
} else {
consumerManager.stop();
}
}
}
其中,ApplicationRunner
类可以在 Spring Boot 项目启动完成后,执行一些初始化操作,这里我们通过获取启动参数来控制消费者的启动。可以通过命令行执行 java -jar ****.jar --kafka.consumer.enabled=true --kafka.consumer.start=true
来启动消费者,kafka.consumer.enabled
表示是否开启消费者,kafka.consumer.start
表示启动或停止消费者。
到这里,我们就完成了 Spring Boot 项目集成 Kafka 消费者,同时实现手动控制 Kafka 消费者启动和停止的操作。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot集成kafka消费手动启动停止操作 - Python技术站