下面我将详细讲解“Spring Boot集成Kafka的示例代码”的完整攻略:
1. 准备工作
首先,我们需要在本地安装好 Kafka,然后创建一个 Topic。
2. 添加依赖
在 Spring Boot 项目中,我们需要使用以下两个依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.4.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
3. 配置 Kafka
在项目的配置文件中,我们需要配置 Kafka 的相关信息,包括:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=testGroup
spring.kafka.consumer.auto-offset-reset=earliest
4. 发送消息
以下是发送消息的示例代码:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
5. 接收消息
以下是接收消息的示例代码:
@KafkaListener(topics = { "testTopic" }, groupId = "testGroup")
public void listen(String message) throws Exception {
System.out.println("Received Message in group - testGroup: " + message);
}
6. 示例1:简单的发送和接收
我们可以通过以下代码进行测试:
sendMessage("testTopic", "Hello Kafka");
Thread.sleep(1000);
在控制台中,我们可以看到如下输出:
Received Message in group - testGroup: Hello Kafka
7. 示例2:发送 JSON 格式的消息
如果消息是 JSON 格式的,我们可以通过以下代码进行发送:
@Autowired
private ObjectMapper objectMapper;
public void sendJsonMessage(String topic, Object object) throws JsonProcessingException {
kafkaTemplate.send(topic, objectMapper.writeValueAsString(object));
}
在接收端,我们可以通过添加 @Payload
注释来将 JSON 转换为对象:
@KafkaListener(topics = { "testTopic" }, groupId = "testGroup")
public void listen(@Payload TestObject object) throws Exception {
System.out.println("Received Message in group - testGroup: " + object);
}
另外,需要在配置文件中添加以下配置:
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties.spring.json.trusted.packages=*
这样,我们就可以在发送端发送 JSON 格式的消息,并在接收端将其转换为对象进行处理了。
以上就是“Spring Boot集成Kafka的示例代码”的完整攻略,希望能够帮助到你。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spring Boot集成Kafka的示例代码 - Python技术站