springboot+kafka中@KafkaListener动态指定多个topic问题

使用SpringBoot和Kafka进行消息传输时,可以使用@KafkaListener注解来监听指定的topic,然而在一些情况下需要动态指定多个topic。下面是在SpringBoot中实现动态指定多个topic的攻略:

  1. 使用ContainerProperties的方法

需要在代码中手动创建一个KafkaMessageListenerContainer容器,并在其中设置需要监听的topic。具体代码如下:

@Component
public class KafkaMessageListener {
    @Autowired
    private KafkaProperties kafkaProperties;

    private ConsumerFactory<String, String> consumerFactory;

    @PostConstruct
    public void init() {
        consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    @KafkaListener(id = "group1", containerFactory = "batchFactory")
    public void listen(List<String> messages) {
        messages.forEach(System.out::println);
    }

    public void dynamicListen(String... topics) {
        ContainerProperties containerProperties = new ContainerProperties(topics);
        containerProperties.setMessageListener(new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                System.out.println(data.value());
            }
        });

        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        container.start();
    }

}

其中,使用了KafkaMessageListener作为容器的监听器。@PostConstruct注解的方法init()是KafkaMessageListener的初始化方法,获取kafka相关属性并生成ConsumerFactory,在KafkaMessageListener中使用了@KafkaListener注解监听一个名为"group1"的消息,并输出消息内容。

而动态指定多个topic的方法是,定义dynamicListen()方法,该方法中我们手动创建KafkaMessageListenerContainer容器,内部设置需要监听的topics和对应的监听器。在容器启动之后,程序便可以自动监听多个topic并进行相应的操作了。

  1. 使用@KafkaListener注解参数的方法
@Component
public class KafkaMessageListener {
    @Autowired
    private KafkaProperties kafkaProperties;

    private ConsumerFactory<String, String> consumerFactory;

    @PostConstruct
    public void init() {
        consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    @KafkaListener(id = "group1", topics = "topic1")
    public void listen1(List<String> messages) {
        messages.forEach(System.out::println);
    }

    @KafkaListener(id = "group2", topics = "topic2")
    public void listen2(List<String> messages) {
        messages.forEach(System.out::println);
    }

    public void dynamicListen(String... topics) {
        StringBuilder sb = new StringBuilder();
        for (String topic: topics) {
            sb.append(topic).append(",");
        }
        String topicStr = sb.toString().substring(0, sb.length()-1);
        String groupId = UUID.randomUUID().toString();

        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        ConsumerFactory<String, String> consumerFactory =
                new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());

        ContainerProperties containerProps = new ContainerProperties(topics);

        MessageListener<String, String> messageListener = new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record.value());
            }
        };

        ConcurrentMessageListenerContainer<String, String> listenerContainer =
                new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
        listenerContainer.getContainerProperties().setGroupId(groupId);
        listenerContainer.setupMessageListener(messageListener);
        listenerContainer.start();
    }

}

该方法比前一种方法更加灵活,可以在同一个类中使用多个@KafkaListener注解方法,每个方法监听不同的topic。同时,如果需要动态指定多个topic,则可以在dynamicListen()方法中,将需要监听的topic实现成一个字符串,以","分割,方法内部使用ClassNotFoundException捕获异常,在抛出异常的情况下使用反射机制调用@KafkaListener注解方法,动态生成@KafkaListener注解并在ContainerProperties中设置对应参数。最后将监听器容器启动即可。

两种方法都能够实现动态指定多个topic,使用方法根据实际场景进行选择即可。下面给出两个使用示例。

示例1:使用ContainerProperties的方法

@Autowired
private KafkaMessageListener kafkaMessageListener;

@Test
public void testDynamicListen() throws InterruptedException {
    String[] topics = {"test1", "test2", "test3"};
    kafkaMessageListener.dynamicListen(topics);
    TimeUnit.MINUTES.sleep(10);//等待10分钟
}

示例2:使用@KafkaListener注解参数的方法

@Autowired
private KafkaMessageListener kafkaMessageListener;

@Test
public void testDynamicListen() throws InterruptedException {
    String[] topics = {"test1", "test2", "test3"};
    kafkaMessageListener.dynamicListen(topics);
    TimeUnit.MINUTES.sleep(10);//等待10分钟
}

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot+kafka中@KafkaListener动态指定多个topic问题 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • JavaSpringBoot报错“PreconditionFailedException”的原因和处理方法

    原因 “PreconditionFailedException” 错误通常是以下原因引起的: 请求头问题:如果请求头中包含不受支持的条件,则可能会出现此错误。在这种情况下,需要检查请求头并确保它们正确。 控制器问题:如果控制器中存在问题,则可能会出现此错误。在这种情况下,需要检查控制器并确保它们正确。 解决办法 以下是解决 “PreconditionFail…

    Java 2023年5月4日
    00
  • Java常用类String的面试题汇总(java面试题)

    下面是整理Java常用类String的面试题汇总的详细攻略。 1. String类的概述 String类是Java中常用的类之一,是由JDK提供的一个不可变的final类,用于存储字符串数据,可以进行字符串的操作和处理。 2. 常见的String类面试题 2.1 如何比较两个字符串是否相等? 首先要了解的是,Java中有两种比较方式,一种是基本类型的比较(=…

    Java 2023年5月20日
    00
  • MySQL数据库8——数据库中函数的应用详解

    MySQL数据库8——数据库中函数的应用详解攻略 一、什么是函数 在MySQL数据库中,函数类似于程序中的函数,可以接受参数,执行一些操作,并返回结果。MySQL数据库已经内置了很多常用的函数,包括字符串、数值、日期和时间等方面的函数。 二、常见的函数 1. 字符串函数 字符串函数主要用于处理字符串类型的数据,下面列举了一些常见的字符串函数及其说明: CON…

    Java 2023年6月16日
    00
  • Window下安装Tomcat服务器的教程

    下面是详细的“Window下安装Tomcat服务器的教程”攻略: 环境准备 Tomcat服务器下载 首先,需要从官网下载Tomcat服务器的安装包。Tomcat官网地址:http://tomcat.apache.org/ 在页面选择“Downloads” -> “Tomcat 10” -> “64-bit Windows zip”进行下载。 Ja…

    Java 2023年5月19日
    00
  • Java中MyBatis Plus知识点总结

    下面我针对“Java中MyBatis Plus知识点总结”的完整攻略逐步讲解。 MyBatis Plus是什么? MyBatis Plus 是一款 MyBatis 增强工具,简化了 MyBatis 的使用流程,提供了很多实用的增强功能。相比 MyBatis,使用 MyBatis Plus 能够更加高效地进行数据持久化操作。 MyBatis Plus主要功能 …

    Java 2023年5月20日
    00
  • idea2020导入spring5.1的源码详细教程

    下面是“idea2020导入spring5.1的源码详细教程”的完整攻略: 1. 下载Spring5.1源码 访问Spring的官网,找到Spring Framework 5.1的下载链接,下载压缩包并解压到本地任意目录。 2. 导入源码到IDEA 打开IDEA,点击“Open”或者“Import Project”,选择Spring源码所在的目录,导入项目。…

    Java 2023年5月31日
    00
  • 深入解析Java编程中方法的参数传递

    深入解析Java编程中方法的参数传递 在Java编程中,方法是我们进行代码模块化的基本单位,而方法的参数传递是Java编程中比较基础但也比较重要的概念之一。本文将从以下几个方面深入解析Java编程中的方法参数传递。 Java方法参数是按值传递还是按引用传递? 这是一个比较基础的问题。实际上,在Java中,方法参数是按值传递的,而不是传递引用。 所谓“按值传递…

    Java 2023年5月26日
    00
  • java 实现反射 json动态转实体类–fastjson

    Java中的反射是一种可以在运行时动态获取类的信息的机制。而fastjson则是一种常用的Java JSON 库,它支持将JSON字符串快速地转换为Java对象,以及将Java对象快速地序列化为JSON字符串。下面将详细介绍如何使用Java反射结合fastjson实现JSON字符串到Java对象的转换。 1. 添加依赖接口 我们需要在项目中添加fastjso…

    Java 2023年5月26日
    00
合作推广
合作推广
分享本页
返回顶部