下面我就来详细讲解一下“Springboot 2.x集成kafka 2.2.0的示例代码”的完整攻略。
简介
Kafka 是一个高吞吐量的分布式消息队列系统,常被用于日志处理、消息系统等场景。Spring Boot 是目前流行的 Java Web 开发框架,具有简单、快速、方便等特点。本文将介绍如何在 Spring Boot 2.x 中集成 Kafka 2.2.0,实现消息的生产和消费。
环境
- Spring Boot 2.x
- Kafka 2.2.0
添加依赖
在 pom.xml 文件中,添加以下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.8.RELEASE</version>
</dependency>
Kafka 配置
在 application.yml 配置文件中,添加 Kafka 服务的地址:
spring:
kafka:
bootstrap-servers: localhost:9092
简单的消息生产者示例
创建 KafkaProducer.java 类,实现消息的生产。代码如下:
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
// 消息发送成功
System.out.println("消息发送成功,topic:" + result.getRecordMetadata().topic() + ",partition:" + result.getRecordMetadata().partition() + ",offset:" + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
// 消息发送失败
System.out.println("消息发送失败:" + ex.getMessage());
}
});
}
}
简单的消息消费者示例
创建 KafkaConsumer.java 类,实现消息的消费。代码如下:
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupId}")
public void listen(ConsumerRecord<String, String> record) {
String message = record.value();
System.out.println("收到消息:" + message);
}
}
在 application.yml 配置文件中添加以下配置:
kafka:
topic: test
groupId: testGroup
在上面的代码中,使用 @KafkaListener 注解实现对指定主题的消息监听,该注解的 topic 属性指定主题,groupId 属性指定消费者组。
示例代码
完整的 Spring Boot 集成 Kafka 的示例代码可以在 https://github.com/swordfall/spring-boot-kafka-sample 中获取。
示例说明
-
在示例代码中,当生产者发送消息后,控制台将打印出发送结果的相关信息,包括主题、分区和偏移量等信息。
-
当消费者收到消息后,控制台将打印出收到的消息内容。
以上就是 Spring Boot 2.x 集成 Kafka 2.2.0 的示例代码的完整攻略,希望能对您有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Springboot 2.x集成kafka 2.2.0的示例代码 - Python技术站