springboot集成kafka消费手动启动停止操作

下面将详细讲解如何在Spring Boot 项目中集成 Kafka 消费者,并实现手动启动、停止操作。

步骤一:添加Kafka依赖

在 maven 的 pom 文件中添加 Kafka 相关依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.7.RELEASE</version>
</dependency>

步骤二:创建Kafka消费配置类

在 Spring Boot 项目中创建 KafkaConsumerConfig 类,作为 Kafka 消费者的配置类,通过注解 @Configuration 声明该类是配置类:

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

ConsumerFactory 类是 Kafka 消费者的工厂类,创建出指定类型的消费者对象;ConcurrentKafkaListenerContainerFactory 类则是用来创建监听器容器的,其中包含了许多配置项,比如 ConsumerFactory、并发数等。

步骤三:创建Kafka消费监听器

在 Spring Boot 项目中创建 KafkaConsumerListener 类,用来监听和处理 Kafka 消息:

@Service
public class KafkaConsumerListener {

    private final Logger logger = LoggerFactory.getLogger(getClass());

    @KafkaListener(topics = {"test-topic"})
    public void onMessage(String message) {
        logger.info("Received message: {}", message);
    }
}

通过 @KafkaListener 注解指定监听的 Topic,并指定处理接收到消息的方法 onMessage

步骤四:手动控制消费者启动和停止

为了可以手动控制消费者的启动和停止,需要创建一个 bean 对象来保存 ConcurrentMessageListenerContainer 类型对象,该对象可以实现“启动、停止、暂停和恢复”等操作。

@Component
public class KafkaConsumerManager {

    @Autowired
    private KafkaConsumerListener listener;

    @Autowired
    private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory;

    private ConcurrentMessageListenerContainer<String, String> container;

    private final Logger logger = LoggerFactory.getLogger(getClass());

    /**
     * 启动
     */
    public void start() {
        container = kafkaListenerContainerFactory.createContainer("test-topic");
        container.setupMessageListener(listener);
        container.start();
        logger.info("Kafka consumer started");
    }

    /**
     * 停止
     */
    public void stop() {
        if (container != null) {
            container.stop();
            container = null;
            logger.info("Kafka consumer stopped");
        }
    }
}

在上述 KafkaConsumerManager 类中,需要注入 KafkaConsumerListener 对象和 Kafka 监听器容器工厂对象,以及创建容器对象的方法,容器对象需要实现监听器容器的基本操作,包括启动、停止、暂定和恢复等。

示例一:手动启动和停止消费者

在 Spring Boot 项目中,我们可以直接注入 KafkaConsumerManager bean,然后调用它的 startstop 方法启动和停止消费者:

@RestController
public class KafkaController {

    @Autowired
    private KafkaConsumerManager consumerManager;

    @GetMapping("/start")
    public String start() {
        // 启动消费者
        consumerManager.start();
        return "Kafka consumer started ";
    }

    @GetMapping("/stop")
    public String stop() {
        // 停止消费者
        consumerManager.stop();
        return "Kafka consumer stopped";
    }
}

示例二:通过启动命令行参数启动和停止消费者

我们也可以通过启动命令行参数来控制 Kafka 消费者的启动和停止。先在 application.properties 文件中添加如下两个属性:

kafka.consumer.enabled=true
kafka.consumer.start=false

然后在 Spring Boot 项目启动类中接收并解析启动命令参数,根据不同参数值控制消费者的启动和停止:

@SpringBootApplication
public class KafkaApplication implements ApplicationRunner {

    @Autowired
    private KafkaConsumerManager consumerManager;

    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        boolean kafkaConsumerEnabled = Boolean.parseBoolean(
            args.getOptionValues("kafka.consumer.enabled").get(0));
        boolean kafkaConsumerStart = Boolean.parseBoolean(
            args.getOptionValues("kafka.consumer.start").get(0));
        if (kafkaConsumerEnabled && kafkaConsumerStart) {
            consumerManager.start();
        } else {
            consumerManager.stop();
        }
    }
}

其中,ApplicationRunner 类可以在 Spring Boot 项目启动完成后,执行一些初始化操作,这里我们通过获取启动参数来控制消费者的启动。可以通过命令行执行 java -jar ****.jar --kafka.consumer.enabled=true --kafka.consumer.start=true 来启动消费者,kafka.consumer.enabled 表示是否开启消费者,kafka.consumer.start 表示启动或停止消费者。

到这里,我们就完成了 Spring Boot 项目集成 Kafka 消费者,同时实现手动控制 Kafka 消费者启动和停止的操作。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot集成kafka消费手动启动停止操作 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • java的Hibernate框架报错“SessionException”的原因和解决方法

    当使用Java的Hibernate框架时,可能会遇到“SessionException”错误。这个错误通常是由于以下原因之一引起的: 会话已关闭:如果您尝试在会话关闭后使用会话,则可能会出现此错误。在这种情况下,需要确保在使用会话之前打开会话,并在使用完毕后关闭会话。 事务已回滚:如果您尝试在事务已回滚后使用会话,则可能会出现此错误。在这种情况下,需要确保在…

    Java 2023年5月4日
    00
  • Java实现从字符串中找出数字字符串的方法小结

    Java实现从字符串中找出数字字符串的方法小结 有时候我们需要从一个字符串中提取数字串,可以使用Java中的正则表达式来实现。 正则表达式 正则表达式是一种用来描述字符串模式的语言。可以用来匹配、查找等操作。 匹配数字 用正则表达式来匹配数字的方式有以下几种: \d:表示匹配任意数字字符(0-9)的字符 [0-9]:表示匹配0-9中的任意一个数字字符 Jav…

    Java 2023年5月27日
    00
  • 非常实用的Tomcat启动脚本实现方法

    非常实用的Tomcat启动脚本实现方法 在Linux环境下,通常我们会使用启动脚本的方式来启动Tomcat。而对于初学者来说,编写一个完整可靠的启动脚本不是一件容易的事情。本文将介绍一种非常实用的Tomcat启动脚本实现方法,帮助大家快速实现Tomcat的自动启动、关闭、查看状态等操作。 Step 1: 创建启动脚本 首先,我们需要创建一个启动脚本。为了让脚…

    Java 2023年5月19日
    00
  • Mybatis批量更新三种方式的实现

    首先我们可以从三种方式的实现入手进行讲解。 方式一:使用foreach标签 使用foreach标签是MyBatis中批量更新的最常用也是最简单的方式。通过foreach标签,可以将多个更新操作一次性提交到数据库中,实现批量更新的效果。 具体实现步骤如下: 在mapper配置文件中定义批量更新的SQL语句,语句中要使用到foreach标签。 <updat…

    Java 2023年5月20日
    00
  • 利用Hadoop实现求共同好友的示例详解

    利用Hadoop实现求共同好友需要以下几个步骤: 划分好友关系 拆分好友关系,生成单向二元组 合并具有相同好友的二元组 在合并结果中找到共同好友 下面的示例中,我们假设有三个人A、B、C,他们之间的好友关系如下所示: A的好友:B、C、D B的好友:A、C、E C的好友:A、B、D、E 使用Hadoop来实现求A和B的共同好友和A和C的共同好友。 划分好友关…

    Java 2023年5月20日
    00
  • JAVA如何获取工程下的文件

    在Java中,我们可以使用相对路径或绝对路径的方式来获取工程下的文件。以下是详细的攻略: 使用相对路径获取工程下的文件 使用 File 对象的相对路径构造方法 可以通过创建 File 对象并传递相对路径来获取工程下的文件。如下所示,获取工程根目录下的 test.txt 文件: File file = new File("test.txt"…

    Java 2023年5月20日
    00
  • JSP 开发之Spring Boot 动态创建Bean

    针对“JSP 开发之Spring Boot 动态创建Bean”,我会提供下面的完整攻略。 一、简介 在Spring Boot开发中,我们可以通过定义Java类来定义Bean,但有些场景下我们需要在程序运行时动态创建Bean,这就需要使用Spring Boot的动态Bean创建特性。 二、动态创建Bean 下面是Spring Boot创建Bean的示例代码: …

    Java 2023年6月15日
    00
  • Spring开发核心之AOP的实现与切入点持久化

    Spring开发核心之AOP的实现与切入点持久化 什么是AOP 将一个大的功能划分为小的功能单元,然后将这些小的功能进行组合,就构成了一个完整的大功能。在划分功能单元的时候,要考虑到它们的通用性。这种技术称为模块化设计,也称为面向切面编程(AOP) AOP的实现 Spring中AOP的实现主要是通过动态代理的方式来实现的。Spring可以为普通的类以及接口生…

    Java 2023年5月20日
    00
合作推广
合作推广
分享本页
返回顶部