下面是 Spring Boot 集成 Kafka 的实现示例。
1. 环境准备
在开始之前,我们需要做一些准备工作:
- 安装 JDK(版本大于等于 1.8.0)。
- 安装 Apache Kafka(版本大于等于 2.0.0)。
2. 集成 kafka
2.1 创建 Spring Boot 项目
首先需要创建一个新项目。打开你的 IDEA,选择 New > Project,然后选择 Spring Initializr。
根据下面的依赖添加:
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
2.2 添加 Kafka 配置
在 application.yml 文件中添加以下内容:
spring:
kafka:
bootstrap-servers:
- localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
2.3 创建消息接收器
创建一个 Kafka 消息接收器,并在其中创建一个消费者,用于接收 Kafka 中的消息。
@Component
public class KafkaReceiver {
@KafkaListener(topics = "hello")
public void processMessage(String message) {
System.out.println("Received message: " + message);
}
}
2.4 创建消息发送器
创建一个 Kafka 消息发送器,用于向 Kafka 中发送消息。
@Service
public class KafkaSender {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String message) {
kafkaTemplate.send("hello", message);
}
}
2.5 测试
在 main 函数中添加以下代码:
@SpringBootApplication
public class Application implements CommandLineRunner {
@Autowired
private KafkaSender kafkaSender;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
kafkaSender.sendMessage("Hello World!");
}
}
启动程序,可以看到如下输出:
Received message: Hello World!
说明消息已经成功地从发送器发送到了接收器,并被正确打印出来了。
3. 添加 Kafka 生产者和消费者示例
3.1 创建 Kafka 生产者
@Service
public class KafkaProducerExample {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.printf("发送成功:topic=%s, partition=%d, offset=%d, message=%s%n",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset(),
message);
}
@Override
public void onFailure(Throwable ex) {
System.out.println("发送失败:" + ex.getMessage());
}
});
}
}
3.2 创建 Kafka 消费者
@Service
public class KafkaConsumerExample {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(ConsumerRecord<?, ?> record) {
System.out.printf("接收到消息:topic=%s, partition=%d, offset=%d, key=%s, value=%s%n",
record.topic(),
record.partition(),
record.offset(),
record.key(),
record.value());
}
}
3.3 测试发送和接收
在 main 函数中添加以下代码:
@Autowired
private KafkaProducerExample kafkaProducerExample;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public void run(String... args) throws Exception {
kafkaProducerExample.sendMessage("my-topic", "Hello World!");
}
启动程序,可以看到如下输出:
接收到消息:topic=my-topic, partition=1, offset=2, key=null, value=Hello World!
发送成功:topic=my-topic, partition=1, offset=2, message=Hello World!
说明我们已经成功地将消息发送到了 Kafka 中,同时又能够从 Kafka 中正确地接收到这些消息。
以上就是 Spring Boot 集成 Kafka 的实现示例,希望对你有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Spring Boot 集成 Kafkad的实现示例 - Python技术站