springboot集成kafka消费手动启动停止操作

下面将详细讲解如何在Spring Boot 项目中集成 Kafka 消费者,并实现手动启动、停止操作。

步骤一:添加Kafka依赖

在 maven 的 pom 文件中添加 Kafka 相关依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>

步骤二:创建Kafka消费配置类

在 Spring Boot 项目中创建 KafkaConsumerConfig 类,作为 Kafka 消费者的配置类,通过注解 @Configuration 声明该类是配置类:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

ConsumerFactory 类是 Kafka 消费者的工厂类,创建出指定类型的消费者对象;ConcurrentKafkaListenerContainerFactory 类则是用来创建监听器容器的,其中包含了许多配置项,比如 ConsumerFactory、并发数等。

步骤三:创建Kafka消费监听器

在 Spring Boot 项目中创建 KafkaConsumerListener 类,用来监听和处理 Kafka 消息:

@Service
public class KafkaConsumerListener {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = {"test-topic"})
    public void onMessage(String message) {
        logger.info("Received message: {}", message);
    }
}

通过 @KafkaListener 注解指定监听的 Topic,并指定处理接收到消息的方法 onMessage

步骤四:手动控制消费者启动和停止

为了可以手动控制消费者的启动和停止,需要创建一个 bean 对象来保存 ConcurrentMessageListenerContainer 类型对象,该对象可以实现“启动、停止、暂停和恢复”等操作。

@Component
public class KafkaConsumerManager {

    @Autowired
    private KafkaConsumerListener listener;

    @Autowired
    private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;

    private ConcurrentMessageListenerContainer<String, String> container;

    private final Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 启动
     */
    public void start() {
        container = kafkaListenerContainerFactory.createContainer("test-topic");
        container.setupMessageListener(listener);
        container.start();
        logger.info("Kafka consumer started");
    }

    /**
     * 停止
     */
    public void stop() {
        if (container != null) {
            container.stop();
            container = null;
            logger.info("Kafka consumer stopped");
        }
    }
}

在上述 KafkaConsumerManager 类中,需要注入 KafkaConsumerListener 对象和 Kafka 监听器容器工厂对象,以及创建容器对象的方法,容器对象需要实现监听器容器的基本操作,包括启动、停止、暂定和恢复等。

示例一:手动启动和停止消费者

在 Spring Boot 项目中,我们可以直接注入 KafkaConsumerManager bean,然后调用它的 startstop 方法启动和停止消费者:

@RestController
public class KafkaController {

    @Autowired
    private KafkaConsumerManager consumerManager;

    @GetMapping("/start")
    public String start() {
        // 启动消费者
        consumerManager.start();
        return "Kafka consumer started ";
    }

    @GetMapping("/stop")
    public String stop() {
        // 停止消费者
        consumerManager.stop();
        return "Kafka consumer stopped";
    }
}

示例二:通过启动命令行参数启动和停止消费者

我们也可以通过启动命令行参数来控制 Kafka 消费者的启动和停止。先在 application.properties 文件中添加如下两个属性:

kafka.consumer.enabled=true
kafka.consumer.start=false

然后在 Spring Boot 项目启动类中接收并解析启动命令参数,根据不同参数值控制消费者的启动和停止:

@SpringBootApplication
public class KafkaApplication implements ApplicationRunner {

    @Autowired
    private KafkaConsumerManager consumerManager;

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        boolean kafkaConsumerEnabled = Boolean.parseBoolean(
            args.getOptionValues("kafka.consumer.enabled").get(0));
        boolean kafkaConsumerStart = Boolean.parseBoolean(
            args.getOptionValues("kafka.consumer.start").get(0));
        if (kafkaConsumerEnabled && kafkaConsumerStart) {
            consumerManager.start();
        } else {
            consumerManager.stop();
        }
    }
}

其中,ApplicationRunner 类可以在 Spring Boot 项目启动完成后,执行一些初始化操作,这里我们通过获取启动参数来控制消费者的启动。可以通过命令行执行 java -jar ****.jar --kafka.consumer.enabled=true --kafka.consumer.start=true 来启动消费者,kafka.consumer.enabled 表示是否开启消费者,kafka.consumer.start 表示启动或停止消费者。

到这里,我们就完成了 Spring Boot 项目集成 Kafka 消费者,同时实现手动控制 Kafka 消费者启动和停止的操作。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot集成kafka消费手动启动停止操作 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Java 实战项目之疫情防控管理系统详解

    Java 实战项目之疫情防控管理系统详解 1. 项目介绍 该项目是一个基于Java的疫情防控管理系统。通过该系统,用户可以实现疫情病例的查询、疫情防控信息的发布和员工健康信息的管理等功能。 2. 技术栈 2.1 前端技术栈 HTML/CSS/JavaScript jQuery Bootstrap 2.2 后端技术栈 Java Spring/Spring MV…

    Java 2023年5月23日
    00
  • Java限流实现的几种方法详解

    Java限流实现的几种方法详解 什么是限流 限流是指在高并发的情况下,为了保护应用系统的稳定性和可用性,通过对请求进行控制和限制,使得系统在单位时间内能够处理的请求数量达到峰值或者控制在峰值以下,以避免系统崩溃或者服务不可用。 为什么需要限流 在高并发的场景中,一旦请求量超出系统的承受范围,就会导致服务的不可用,或者服务响应变慢,最终影响到用户体验。此时,通…

    Java 2023年5月19日
    00
  • SpringSecurity 表单登录的实现

    下面是“SpringSecurity 表单登录的实现”的完整攻略: 什么是SpringSecurity? SpringSecurity 是一种基于 Spring 的安全框架,可以为 web 应用程序提供身份验证(Authentication)、授权(Authorization)和其他安全性功能。SpringSecurity 可以轻松集成到现有的 Spring…

    Java 2023年6月3日
    00
  • 浅谈Java自定义注解相关知识

    浅谈Java自定义注解相关知识 什么是Java注解 Java注解(Annotation),是Java SE 5.0中新增的一个特性,也是Java语言中一个重要的元编程工具。注解是对程序代码进行一些特殊标记的一种形式化语言机制,用于在源代码中嵌入元数据信息,以及为编译器、虚拟机、运行期系统等软件提供提示和解释性信息,以达到代码分析、配置文件生成,甚至是程序编译…

    Java 2023年5月27日
    00
  • Spring Security 自定义授权服务器实践记录

    Spring Security 自定义授权服务器实践记录 介绍 Spring Security是一个功能非常强大的安全框架,可以用于处理各种身份认证和授权问题。其中,授权服务器是Spring Security的重要组成部分,用于为客户端颁发访问令牌,同时对请求进行验证和授权。本文将详细介绍如何使用Spring Security自定义授权服务器,并给出两个示例…

    Java 2023年5月20日
    00
  • Java实现文件读取和写入过程解析

    Java实现文件读取和写入过程解析 在Java中,读取和写入文件是非常常见的操作,本文将详细介绍Java实现文件读取和写入的过程,并提供两个示例进行演示。 文件读取 文件读取可以使用Java标准库中提供的java.io包中的FileReader和BufferedReader类实现。 FileReader类用于读取字符文件,BufferedReader类可以优…

    Java 2023年5月20日
    00
  • springmvc+spring+mybatis实现用户登录功能(下)

    本文将详细讲解如何使用SpringMVC、Spring和MyBatis框架实现用户登录功能。本文将分为两部分,本文是第二部分,主要介绍如何使用MyBatis框架实现用户登录功能。 使用MyBatis框架实现用户登录功能 MyBatis是一种优秀的持久层框架,它可以帮助我们更加方便地操作数据库。在本节中,我们将使用MyBatis框架实现用户登录功能。 步骤一:…

    Java 2023年5月17日
    00
  • 启动tomcat时 错误: 代理抛出异常 : java.rmi.server.ExportException: Port already in use: 1099的解决办法

    如果在启动Tomcat时出现“错误: 代理抛出异常: java.rmi.server.ExportException: Port already in use: 1099”的错误,意味着端口1099已经被占用了。一般是因为其他应用程序或Tomcat已经在使用该端口,导致Tomcat无法使用。 解决该问题的方法有两种,一种是找到占用了端口1099的应用程序并关…

    Java 2023年5月27日
    00
合作推广
合作推广
分享本页
返回顶部