使用spring boot 整合kafka,延迟启动消费者

下面是使用Spring Boot整合Kafka,延迟启动消费者的详细攻略,由以下步骤组成:

  1. 添加Kafka依赖

在Spring Boot项目中,需要在pom.xml文件中添加Kafka的依赖,可以通过以下方式添加:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.1</version>
</dependency>
  1. 创建Kafka配置类

接下来我们需要创建一个配置类,用于配置Kafka:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(),
                new StringDeserializer(),
                new StringDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true); //设置批量消费
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
}

在配置类中,我们将消费者的相关配置属性注入,同时定义一个Map对象并在consumerConfigs方法中进行初始化。然后定义一个ConsumerFactory对象来构建DefaultKafkaConsumerFactory对象,传入Kafka相关的属性和StringDeserializer实例。最后在kafkaListenerContainerFactory方法中设置了ConcurrentKafkaListenerContainerFactory对象来实现我们的批量消费或其他消费需要。

  1. 创建Kafka消费者

现在我们需要创建一个Kafka消费者,用于消费Kafka中的消息:

@Service
public class KafkaConsumerService {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);

    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void listen(ConsumerRecord<String, String> record) {
        LOGGER.info("收到消息:{}", record.value());
    }
}

在这里,我们使用@KafkaListener注解来监听spring.kafka.consumer.topic主题,当有新的消息到达时会触发listen方法,其中ConsumerRecord对象包含了消息的元数据和消息内容。

  1. 延迟启动Kafka消费者

由于Kafka消费者默认启动时就会开始消费,如果我们想要延迟消费的开始时间,需要按照以下步骤进行配置:

  • application.yml文件中的spring.kafka.listener.type属性设置为batch
  • KafkaConsumerService类中添加@ConditionalOnProperty注解来判断是否需要延迟启动消费者。
  • KafkaConsumerService类中定义一个计数器,用于延迟消费者的启动,直到计数器减为0时启动消费。我们可以使用CountDownLatch来实现这个功能。
@Service
public class KafkaConsumerService {

    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);

    /**
     * 计数器,用于延迟消费者启动
     */
    private CountDownLatch latch = new CountDownLatch(1);

    /**
     * 添加一个带有@ConditionalOnProperty注解的构造方法
     * 用于判断application.yml配置中是否需要延迟消费者启动
     */
    public KafkaConsumerService(@Value("${spring.kafka.consumer.delayed-start.enabled:false}") boolean delayedStartEnabled) {
        if (delayedStartEnabled) {
            new Thread(() -> {
                try {
                    LOGGER.warn("消费者延迟启动中...");
                    Thread.sleep(5000); //延迟5秒钟
                } catch (InterruptedException e) {
                    LOGGER.error("线程休眠异常:", e);
                }
                latch.countDown();
                LOGGER.warn("消费者启动...");
            }).start();
        } else {
            latch.countDown();
        }
    }

    /**
     * 监听kafka消息,使用@KafkaListener注解即可
     */
    @KafkaListener(topics = "${spring.kafka.consumer.topic}")
    public void listen(ConsumerRecord<String, String> record) {
        try {
            //等待计数器归零,即延迟秒数到了才真正开始消费
            latch.await();
            LOGGER.info("收到消息:{}", record.value());
        } catch (InterruptedException e) {
            LOGGER.error("线程休眠异常:", e);
        }
    }
}

通过以上代码,我们创建了一个计数器latch,用于延迟消费者的启动,并判断了application.yml文件中是否需要延迟消费的开启。如果需要,则使用CountDownLatch进行计数,在延迟结束后将计数器减为0,从而触发消费者的启动。

至此,我们就成功地使用Spring Boot整合Kafka,并且实现了延迟启动消费者的功能。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:使用spring boot 整合kafka,延迟启动消费者 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Java使用DateTimeFormatter实现格式化时间

    下面是针对“Java使用DateTimeFormatter实现格式化时间”的完整攻略: 引言 在Java中,有时我们需要将时间转换为特定格式以便与其他系统交互。这时我们可以使用Java 8引入的DateTimeFormatter类进行格式化。该类提供了一些预定义的格式模式,也允许用户定义自己的格式模式。 步骤 1. 创建一个LocalDateTime对象 D…

    Java 2023年5月20日
    00
  • 使用Ajax模仿百度搜索框的自动提示功能实例

    关于“使用Ajax模仿百度搜索框的自动提示功能实例”的完整攻略,我将提供以下详细说明: 1. 编写HTML结构和CSS样式 首先,需要在HTML中添加一个输入框和用于显示自动提示的容器。输入框需要设置一个ID,并且需要引入相关的CSS样式,例如下面这样: <input type="text" id="searchbar&q…

    Java 2023年6月15日
    00
  • Spring Boot 单元测试JUnit的实践

    下面是关于“Spring Boot 单元测试JUnit的实践”的完整攻略: 一、为什么需要进行单元测试 单元测试是指对程序中的最小可测试单元进行检查和验证,确保每个单元都可以独立地并且正确地工作。而JUnit是Java中广泛使用的单元测试框架之一。 在实际开发中,进行单元测试可以帮助我们及时发现程序中的错误和bug,提高代码的健壮性和可靠性,同时也可以避免因…

    Java 2023年5月19日
    00
  • Java的Struts框架报错“ActionServletMappingException”的原因与解决办法

    当使用Java的Struts框架时,可能会遇到“ActionServletMappingException”错误。这个错误通常由以下原因之一起: ServletMapping配置错误:如果配置文件中没有正确ServletMapping,则可能会现此错误。在这种情况下,需要检查文件以解决此问题。 ServletMapping无效:如果ServletMappin…

    Java 2023年5月5日
    00
  • javaWeb项目部署到阿里云服务Linux系统的详细步骤

    下面是Java Web项目部署到阿里云服务Linux系统的详细步骤攻略: 一、购买阿里云ECS云服务器 首先,在阿里云官网注册账号并购买云服务器ECS,可以根据需求购买不同配置的云服务器。购买完成后,需要进行初始化和配置,设置登录密码,绑定公网IP等。 二、安装Java环境 登录云服务器ECS,可使用Windows的远程桌面连接或使用SSH客户端连接到云服务…

    Java 2023年5月19日
    00
  • MySQL 处理大数据表的 3 种方案,写的太好了,建议收藏!!

    作者:马佩 链接:https://juejin.cn/post/7146016771936354312 场景 当我们业务数据库表中的数据越来越多,如果你也和我遇到了以下类似场景,那让我们一起来解决这个问题 数据的插入,查询时长较长 后续业务需求的扩展 在表中新增字段 影响较大 表中的数据并不是所有的都为有效数据 需求只查询时间区间内的 评估表数据体量 我们可…

    Java 2023年4月17日
    00
  • Java元空间的作用是什么?

    Java元空间是Java虚拟机运行时数据区的一部分,它主要是用来存储类的元数据信息和静态变量。相较于传统的Java堆,Java元空间不再是一个连续的内存区域,而是使用本地内存或者操作系统提供的内存。下面,我将从以下几个方面进行详细讲解Java元空间的作用及相关攻略: Java元空间为什么会被引入? 在Java虚拟机中,类的元数据和静态变量原本是存放在永久代中…

    Java 2023年5月11日
    00
  • Java常用函数式接口总结

    Java常用函数式接口总结 Java已经从JDK 8开始支持函数式编程,因此添加了许多的函数式接口,包括常用的Function、Predicate、Consumer等等。本文将对Java中常用的函数式接口进行总结,并给出相应的使用示例。 Function Function接口定义了一个输入参数类型,返回一个结果类型的方法,通常用于将一个类型的值转换为另一个类…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部