使用SpringBoot和Kafka进行消息传输时,可以使用@KafkaListener注解来监听指定的topic,然而在一些情况下需要动态指定多个topic。下面是在SpringBoot中实现动态指定多个topic的攻略:
- 使用ContainerProperties的方法
需要在代码中手动创建一个KafkaMessageListenerContainer容器,并在其中设置需要监听的topic。具体代码如下:
@Component
public class KafkaMessageListener {
@Autowired
private KafkaProperties kafkaProperties;
private ConsumerFactory<String, String> consumerFactory;
@PostConstruct
public void init() {
consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@KafkaListener(id = "group1", containerFactory = "batchFactory")
public void listen(List<String> messages) {
messages.forEach(System.out::println);
}
public void dynamicListen(String... topics) {
ContainerProperties containerProperties = new ContainerProperties(topics);
containerProperties.setMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
System.out.println(data.value());
}
});
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
container.start();
}
}
其中,使用了KafkaMessageListener作为容器的监听器。@PostConstruct注解的方法init()是KafkaMessageListener的初始化方法,获取kafka相关属性并生成ConsumerFactory,在KafkaMessageListener中使用了@KafkaListener注解监听一个名为"group1"的消息,并输出消息内容。
而动态指定多个topic的方法是,定义dynamicListen()方法,该方法中我们手动创建KafkaMessageListenerContainer容器,内部设置需要监听的topics和对应的监听器。在容器启动之后,程序便可以自动监听多个topic并进行相应的操作了。
- 使用@KafkaListener注解参数的方法
@Component
public class KafkaMessageListener {
@Autowired
private KafkaProperties kafkaProperties;
private ConsumerFactory<String, String> consumerFactory;
@PostConstruct
public void init() {
consumerFactory = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
@KafkaListener(id = "group1", topics = "topic1")
public void listen1(List<String> messages) {
messages.forEach(System.out::println);
}
@KafkaListener(id = "group2", topics = "topic2")
public void listen2(List<String> messages) {
messages.forEach(System.out::println);
}
public void dynamicListen(String... topics) {
StringBuilder sb = new StringBuilder();
for (String topic: topics) {
sb.append(topic).append(",");
}
String topicStr = sb.toString().substring(0, sb.length()-1);
String groupId = UUID.randomUUID().toString();
Map<String, Object> props = kafkaProperties.buildConsumerProperties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
ConsumerFactory<String, String> consumerFactory =
new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new StringDeserializer());
ContainerProperties containerProps = new ContainerProperties(topics);
MessageListener<String, String> messageListener = new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println(record.value());
}
};
ConcurrentMessageListenerContainer<String, String> listenerContainer =
new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
listenerContainer.getContainerProperties().setGroupId(groupId);
listenerContainer.setupMessageListener(messageListener);
listenerContainer.start();
}
}
该方法比前一种方法更加灵活,可以在同一个类中使用多个@KafkaListener注解方法,每个方法监听不同的topic。同时,如果需要动态指定多个topic,则可以在dynamicListen()方法中,将需要监听的topic实现成一个字符串,以","分割,方法内部使用ClassNotFoundException捕获异常,在抛出异常的情况下使用反射机制调用@KafkaListener注解方法,动态生成@KafkaListener注解并在ContainerProperties中设置对应参数。最后将监听器容器启动即可。
两种方法都能够实现动态指定多个topic,使用方法根据实际场景进行选择即可。下面给出两个使用示例。
示例1:使用ContainerProperties的方法
@Autowired
private KafkaMessageListener kafkaMessageListener;
@Test
public void testDynamicListen() throws InterruptedException {
String[] topics = {"test1", "test2", "test3"};
kafkaMessageListener.dynamicListen(topics);
TimeUnit.MINUTES.sleep(10);//等待10分钟
}
示例2:使用@KafkaListener注解参数的方法
@Autowired
private KafkaMessageListener kafkaMessageListener;
@Test
public void testDynamicListen() throws InterruptedException {
String[] topics = {"test1", "test2", "test3"};
kafkaMessageListener.dynamicListen(topics);
TimeUnit.MINUTES.sleep(10);//等待10分钟
}
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot+kafka中@KafkaListener动态指定多个topic问题 - Python技术站