springboot+kafka中@KafkaListener动态指定多个topic问题

使用SpringBoot和Kafka进行消息传输时,可以使用@KafkaListener注解来监听指定的topic,然而在一些情况下需要动态指定多个topic。下面是在SpringBoot中实现动态指定多个topic的攻略:

  1. 使用ContainerProperties的方法

需要在代码中手动创建一个KafkaMessageListenerContainer容器,并在其中设置需要监听的topic。具体代码如下:

@Component
public class KafkaMessageListener {
    @Autowired
    private KafkaProperties kafkaProperties;

    private ConsumerFactory<String, String> consumerFactory;

    @PostConstruct
    public void init() {
        consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    @KafkaListener(id = "group1", containerFactory = "batchFactory")
    public void listen(List<String> messages) {
        messages.forEach(System.out::println);
    }

    public void dynamicListen(String... topics) {
        ContainerProperties containerProperties = new ContainerProperties(topics);
        containerProperties.setMessageListener(new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> data) {
                System.out.println(data.value());
            }
        });

        KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
        container.start();
    }

}

其中,使用了KafkaMessageListener作为容器的监听器。@PostConstruct注解的方法init()是KafkaMessageListener的初始化方法,获取kafka相关属性并生成ConsumerFactory,在KafkaMessageListener中使用了@KafkaListener注解监听一个名为"group1"的消息,并输出消息内容。

而动态指定多个topic的方法是,定义dynamicListen()方法,该方法中我们手动创建KafkaMessageListenerContainer容器,内部设置需要监听的topics和对应的监听器。在容器启动之后,程序便可以自动监听多个topic并进行相应的操作了。

  1. 使用@KafkaListener注解参数的方法
@Component
public class KafkaMessageListener {
    @Autowired
    private KafkaProperties kafkaProperties;

    private ConsumerFactory<String, String> consumerFactory;

    @PostConstruct
    public void init() {
        consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
    }

    @KafkaListener(id = "group1", topics = "topic1")
    public void listen1(List<String> messages) {
        messages.forEach(System.out::println);
    }

    @KafkaListener(id = "group2", topics = "topic2")
    public void listen2(List<String> messages) {
        messages.forEach(System.out::println);
    }

    public void dynamicListen(String... topics) {
        StringBuilder sb = new StringBuilder();
        for (String topic: topics) {
            sb.append(topic).append(",");
        }
        String topicStr = sb.toString().substring(0, sb.length()-1);
        String groupId = UUID.randomUUID().toString();

        Map<String, Object> props = kafkaProperties.buildConsumerProperties();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        ConsumerFactory<String, String> consumerFactory =
                new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());

        ContainerProperties containerProps = new ContainerProperties(topics);

        MessageListener<String, String> messageListener = new MessageListener<String, String>() {
            @Override
            public void onMessage(ConsumerRecord<String, String> record) {
                System.out.println(record.value());
            }
        };

        ConcurrentMessageListenerContainer<String, String> listenerContainer =
                new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
        listenerContainer.getContainerProperties().setGroupId(groupId);
        listenerContainer.setupMessageListener(messageListener);
        listenerContainer.start();
    }

}

该方法比前一种方法更加灵活,可以在同一个类中使用多个@KafkaListener注解方法,每个方法监听不同的topic。同时,如果需要动态指定多个topic,则可以在dynamicListen()方法中,将需要监听的topic实现成一个字符串,以","分割,方法内部使用ClassNotFoundException捕获异常,在抛出异常的情况下使用反射机制调用@KafkaListener注解方法,动态生成@KafkaListener注解并在ContainerProperties中设置对应参数。最后将监听器容器启动即可。

两种方法都能够实现动态指定多个topic,使用方法根据实际场景进行选择即可。下面给出两个使用示例。

示例1:使用ContainerProperties的方法

@Autowired
private KafkaMessageListener kafkaMessageListener;

@Test
public void testDynamicListen() throws InterruptedException {
    String[] topics = {"test1", "test2", "test3"};
    kafkaMessageListener.dynamicListen(topics);
    TimeUnit.MINUTES.sleep(10);//等待10分钟
}

示例2:使用@KafkaListener注解参数的方法

@Autowired
private KafkaMessageListener kafkaMessageListener;

@Test
public void testDynamicListen() throws InterruptedException {
    String[] topics = {"test1", "test2", "test3"};
    kafkaMessageListener.dynamicListen(topics);
    TimeUnit.MINUTES.sleep(10);//等待10分钟
}

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot+kafka中@KafkaListener动态指定多个topic问题 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Springboot集成mybatis与jsp过程详解

    下面详细讲解Springboot集成mybatis与jsp的过程。 环境配置 首先需要安装Java虚拟机和Maven,可以去官网下载安装。 建立一个Springboot工程,可以使用Eclipse、IntelliJ IDEA等开发工具,也可以在https://start.spring.io/官网上生成一个基本的Springboot项目。 添加依赖包 在pom…

    Java 2023年5月19日
    00
  • docker inspect 操作详解

    “docker inspect”命令用于获取Docker容器、镜像或其他相关对象的详细信息。以下是“docker inspect”的详细操作攻略。 1. 命令格式 Docker命令通常采用以下格式: docker inspect [OPTIONS] NAME|ID [NAME|ID…] 其中,OPTIONS是可选参数,NAME|ID是Docker对象的名…

    Java 2023年6月15日
    00
  • 高分面试分析jvm如何实现多态

    针对“高分面试分析jvm如何实现多态”的问题,我们可以采用以下步骤进行解答: 1. 简要介绍多态的概念 多态是面向对象程序设计中的一个重要概念,它指的是在运行时确定对象类型,而非编译时确定类型。在多态的实现过程中,一个对象根据不同的上下文环境表现出不同的行为,实现了代码的灵活性和可扩展性。在Java中,多态的实现至少需要使用到继承、虚函数(也称为动态绑定或者…

    Java 2023年5月26日
    00
  • SpringBoot连接MYSQL数据库并使用JPA进行操作

    下面是关于“SpringBoot连接MYSQL数据库并使用JPA进行操作”的完整攻略。 准备工作 在开始操作前,需要先进行一些准备工作: 安装MySQL数据库 安装Java SDK 安装SpringBoot框架 安装JPA 连接MYSQL数据库 首先,在SpringBoot的配置文件(application.properties)中添加MYSQL数据库的配置…

    Java 2023年5月20日
    00
  • Java创建多线程服务器流程

    创建多线程服务器是Java网络编程的重要部分,具有很高的实用价值。以下是实现Java创建多线程服务器的完整攻略。 过程 第一步:创建ServerSocket对象 ServerSocket类是Java语言提供的Socket接口,用于管理服务器端的网络地址和端口号等信息。创建ServerSocket对象的代码如下: ServerSocket server = n…

    Java 2023年5月26日
    00
  • Java如何调用TSC打印机进行打印详解

    关于Java如何调用TSC打印机进行打印,一般可以通过以下步骤来实现: 1. 前置条件 确认TSC打印机已经按照相应的通信协议和驱动程序与计算机进行连接和配置 了解打印指令,并准备好需要打印的内容 2. 使用TSC指令打印 2.1 建立连接 在Java中使用TSC指令打印,需要借助于TSC封装好的指令集,具体步骤如下: 导入TSC指令集jar包 xml &l…

    Java 2023年5月26日
    00
  • Arthas排查Kubernetes中应用频繁挂掉重启异常

    以下是 Arthas 排查 Kubernetes 中应用频繁挂掉重启异常的完整攻略。 确认场景 首先,需要确认场景。用户反馈应用经常挂掉重启,需要排查问题。该应用运行在 Kubernetes 集群中。需要确定:是所有的节点都有相同的问题,还是只有某个节点有问题。同时,需要定位是否是应用级别的问题。 安装 Arthas 因为需要使用到 Arthas 工具,所以…

    Java 2023年5月20日
    00
  • Springboot的spring-boot-maven-plugin导入失败的解决方案

    在使用Springboot开发时,可能会出现使用spring-boot-maven-plugin插件导入失败的情况。下面是解决方案的完整攻略: 1. 确认maven配置文件 在使用spring-boot-maven-plugin插件时,首先需要确认你的maven配置文件是否正确。在你的maven配置文件(settings.xml)中添加以下配置: <p…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部