springboot集成kafka消费手动启动停止操作

yizhihongxing

下面将详细讲解如何在Spring Boot 项目中集成 Kafka 消费者,并实现手动启动、停止操作。

步骤一:添加Kafka依赖

在 maven 的 pom 文件中添加 Kafka 相关依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>

步骤二:创建Kafka消费配置类

在 Spring Boot 项目中创建 KafkaConsumerConfig 类,作为 Kafka 消费者的配置类,通过注解 @Configuration 声明该类是配置类:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

ConsumerFactory 类是 Kafka 消费者的工厂类,创建出指定类型的消费者对象;ConcurrentKafkaListenerContainerFactory 类则是用来创建监听器容器的,其中包含了许多配置项,比如 ConsumerFactory、并发数等。

步骤三:创建Kafka消费监听器

在 Spring Boot 项目中创建 KafkaConsumerListener 类,用来监听和处理 Kafka 消息:

@Service
public class KafkaConsumerListener {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = {"test-topic"})
    public void onMessage(String message) {
        logger.info("Received message: {}", message);
    }
}

通过 @KafkaListener 注解指定监听的 Topic,并指定处理接收到消息的方法 onMessage

步骤四:手动控制消费者启动和停止

为了可以手动控制消费者的启动和停止,需要创建一个 bean 对象来保存 ConcurrentMessageListenerContainer 类型对象,该对象可以实现“启动、停止、暂停和恢复”等操作。

@Component
public class KafkaConsumerManager {

    @Autowired
    private KafkaConsumerListener listener;

    @Autowired
    private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;

    private ConcurrentMessageListenerContainer<String, String> container;

    private final Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 启动
     */
    public void start() {
        container = kafkaListenerContainerFactory.createContainer("test-topic");
        container.setupMessageListener(listener);
        container.start();
        logger.info("Kafka consumer started");
    }

    /**
     * 停止
     */
    public void stop() {
        if (container != null) {
            container.stop();
            container = null;
            logger.info("Kafka consumer stopped");
        }
    }
}

在上述 KafkaConsumerManager 类中,需要注入 KafkaConsumerListener 对象和 Kafka 监听器容器工厂对象,以及创建容器对象的方法,容器对象需要实现监听器容器的基本操作,包括启动、停止、暂定和恢复等。

示例一:手动启动和停止消费者

在 Spring Boot 项目中,我们可以直接注入 KafkaConsumerManager bean,然后调用它的 startstop 方法启动和停止消费者:

@RestController
public class KafkaController {

    @Autowired
    private KafkaConsumerManager consumerManager;

    @GetMapping("/start")
    public String start() {
        // 启动消费者
        consumerManager.start();
        return "Kafka consumer started ";
    }

    @GetMapping("/stop")
    public String stop() {
        // 停止消费者
        consumerManager.stop();
        return "Kafka consumer stopped";
    }
}

示例二:通过启动命令行参数启动和停止消费者

我们也可以通过启动命令行参数来控制 Kafka 消费者的启动和停止。先在 application.properties 文件中添加如下两个属性:

kafka.consumer.enabled=true
kafka.consumer.start=false

然后在 Spring Boot 项目启动类中接收并解析启动命令参数,根据不同参数值控制消费者的启动和停止:

@SpringBootApplication
public class KafkaApplication implements ApplicationRunner {

    @Autowired
    private KafkaConsumerManager consumerManager;

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        boolean kafkaConsumerEnabled = Boolean.parseBoolean(
            args.getOptionValues("kafka.consumer.enabled").get(0));
        boolean kafkaConsumerStart = Boolean.parseBoolean(
            args.getOptionValues("kafka.consumer.start").get(0));
        if (kafkaConsumerEnabled && kafkaConsumerStart) {
            consumerManager.start();
        } else {
            consumerManager.stop();
        }
    }
}

其中,ApplicationRunner 类可以在 Spring Boot 项目启动完成后,执行一些初始化操作,这里我们通过获取启动参数来控制消费者的启动。可以通过命令行执行 java -jar ****.jar --kafka.consumer.enabled=true --kafka.consumer.start=true 来启动消费者,kafka.consumer.enabled 表示是否开启消费者,kafka.consumer.start 表示启动或停止消费者。

到这里,我们就完成了 Spring Boot 项目集成 Kafka 消费者,同时实现手动控制 Kafka 消费者启动和停止的操作。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot集成kafka消费手动启动停止操作 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • java高级用法之JNA中的Structure

    下面详细讲解一下Java高级用法之JNA中的Structure: 什么是JNA? JNA全称为Java Native Access,它是一个开源的Java库,可以让Java程序无需写任何Native代码实现直接访问本地DLL、 shared libraries和C等 Native语言编写的动态库(so)等。 Structure在JNA中的作用 在JNA中,S…

    Java 2023年5月26日
    00
  • Java中的InterruptedException是什么?

    InterruptedException 是 Java 中的异常类,它主要发生在一个正在等待某个时间或资源的线程被其他线程中断时,用于通知该线程所等待的操作已经无法继续。本文将详细讲解 Java 中的 InterruptedException,包括其用法、常见场景和示例说明。 用法 InterruptedException 继承自 Exception 类,通…

    Java 2023年4月27日
    00
  • JS前端知识点总结之内置对象,日期对象和定时器相关操作

    下面是对于“JS前端知识点总结之内置对象,日期对象和定时器相关操作”的完整攻略。 内置对象 概述 JavaScript中提供了很多内置对象,包括但不限于: String Number Boolean Array Object Date RegExp Math 我们可以用它们提供的方法和属性,来轻松地实现一些功能,从而提高效率。 Date对象 Date对象可以…

    Java 2023年5月26日
    00
  • JavaWeb 实现验证码功能(demo)

    我来给你详细讲解“JavaWeb 实现验证码功能(demo)”的完整攻略。 什么是验证码 验证码是指在许多需要用户注册、登录或提交信息的网站,为防止机器人恶意注册、登录或提交信息而设置的一种图形验证码,需要用户正确填写之后才能通过相关验证。 如何实现JavaWeb验证码 实现JavaWeb验证码的步骤如下: 1.后端生成验证码图片 在后端使用Java的图片处…

    Java 2023年5月26日
    00
  • Spring Boot 优雅整合多数据源

    下面是 Spring Boot 优雅整合多数据源的完整攻略。 1. 背景 Spring Boot 为我们提供了非常便捷的开发方式,但在项目中使用多数据源时,代码会变得比较冗长和难以维护。所以,需要一种更加简洁优美的方式来整合多数据源。 2. 实现方式 Spring Boot 优雅整合多数据源的方式,主要是通过使用 Spring 自带的 AbstractRou…

    Java 2023年5月20日
    00
  • SpringBoot浅析安全管理之Spring Security配置

    SpringBoot浅析安全管理之Spring Security配置 Spring Security是一个强大的安全框架,可以为Spring应用程序提供身份验证、授权、加密和会话管理等功能。在本文中,我们将介绍如何使用Spring Security配置安全管理,并提供两个示例。 步骤一:添加Spring Security依赖 我们需要在pom.xml文件中添…

    Java 2023年5月15日
    00
  • Java如何实现数字逆序

    当我们需要将一个整数的数位逆序后输出时,我们可以使用Java语言实现该功能,具体实现方法如下: 步骤一:将整数转换为字符串 首先,我们需要将整数转换为字符串类型,以便于我们对其进行操作。Java中,可以使用toString()方法将整数转换为字符串,示例如下: int num = 12345; String strNum = Integer.toString…

    Java 2023年5月26日
    00
  • Java父线程(或是主线程)等待所有子线程退出的实例

    Java父线程(或是主线程)等待所有子线程退出的实例,可以通过使用Thread的join()方法实现。 join()方法的功能是等待该线程执行结束,即阻塞等待该线程结束,然后再继续执行下面的代码。我们可以利用该方法等待所有子线程执行结束,从而达到等待所有子线程退出的目的。 下面是一个完整的示例代码: public class MainThread { pub…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部