下面我将详细讲解如何在Spring Boot项目中集成Kafka消息中间件,包括以下内容:
- 环境准备
- Maven依赖配置
- Kafka配置
- 生产者代码示例
- 消费者代码示例
环境准备
在开始之前,我们需要确保本地环境中已经安装好了以下软件:
- Java JDK 1.8或更高版本
- Apache Kafka 2.1.0或更高版本
Maven依赖配置
在pom.xml文件中添加如下Maven依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.4</version>
</dependency>
Kafka配置
在application.properties文件中添加如下Kafka配置:
#Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.listener.poll-timeout=3000
spring.kafka.consumer.auto-offset-reset=earliest
生产者代码示例
首先,我们需要在Spring Boot应用程序中实例化一个KafkaTemplate:
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
接下来,我们可以通过KafkaTemplate对象向Kafka发送消息:
@Service
public class MyService {
private final KafkaTemplate<String, String> kafkaTemplate;
@Autowired
public MyService(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
消费者代码示例
首先,我们需要编写一个消费者监听器来处理从Kafka接收到的消息:
@Service
public class MyListener {
@KafkaListener(topics = "myTopic")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
我们还需要在Spring Boot应用程序中创建一个KafkaListenerContainerFactory,用于从Kafka接收消息并调用我们的监听器方法:
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
@Bean
public MyListener listener() {
return new MyListener();
}
}
最后,我们需要在MyListener类上添加@KafkaListener注解,指定要监听的Kafka主题:
@Service
public class MyListener {
@KafkaListener(topics = "myTopic")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
以上就是在Spring Boot项目中集成Kafka消息中间件的完整攻略,同时包含生产者和消费者的示例代码。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spring boot集成Kafka消息中间件代码实例 - Python技术站