下面是“Spring Boot与Kafka集成的简单实例”的攻略:
一、前置条件
在开始本教程之前,你需要做如下准备:
- 安装Java 8或更高版本
- 安装Kafka并启动Kafka服务
- 安装Maven
二、创建Spring Boot工程
首先,我们需要创建一个Spring Boot工程。这里我们使用Spring Initializr来创建一个最小化的Spring Boot工程。
打开http://start.spring.io/,按照如下要求进行设置:
- 选择“Gradle Project”或“Maven Project”
- 输入“Group”和“Artifact”
- 添加以下的依赖:
Spring Boot DevTools
Spring Web
Spring Kafka
最后,点击“Generate”按钮,下载生成的工程并导入到你的IDE中。
三、编写Kafka生产者和消费者
在这里,我们将编写两个Spring Bean,一个是Kafka生产者、一个是Kafka消费者。这两个Bean的实现代码如下:
1. KafkaProducer.java
package com.example.demo;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public KafkaProducer(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendMessage(String message) {
kafkaTemplate.send("test", message);
System.out.printf("Sent message: %s\n", message);
}
}
2. KafkaConsumer.java
package com.example.demo;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Service
public class KafkaConsumer {
@KafkaListener(topics = {"test"}, groupId = "group_id")
public void consume(String message) {
System.out.printf("Consumed message: %s\n", message);
}
}
3. 配置Kafka连接信息
我们需要在application.properties
文件中配置Kafka连接信息,如下所示:
spring.kafka.bootstrap-servers=localhost:9092
四、编写Restful API接口
在这里,我们将编写一个Restful API接口,通过该接口,用户可向Kafka发送消息。
package com.example.demo;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class KafkaController {
private final KafkaProducer kafkaProducer;
public KafkaController(KafkaProducer kafkaProducer) {
this.kafkaProducer = kafkaProducer;
}
@GetMapping("/send/{message}")
public String sendMessage(@PathVariable("message") String message) {
kafkaProducer.sendMessage(message);
return "Message sent successfully!";
}
}
五、运行示例
在完成上述步骤之后,我们可以通过以下两个示例来测试我们的应用程序:
1. 示例1:发送消息
启动服务后,我们可以通过以下命令向Kafka发送消息:
curl localhost:8080/send/hello
这会向test
主题发送消息hello
。
2. 示例2:接收消息
启动服务后,我们可以在控制台上看到Kafka消费者的输出,如下所示:
2021-09-16 18:36:44.755 INFO 12880 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.8.0
2021-09-16 18:36:44.755 INFO 12880 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: fa5b0436d1142915
2021-09-16 18:36:44.756 INFO 12880 --- [ntainer#0-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1631792204755
Consumed message: hello
六、总结
至此,我们已经完成了一个简单的Spring Boot与Kafka集成示例。通过本例,我们了解了如何创建一个Spring Boot工程,以及如何编写Kafka生产者和消费者,并通过Restful API接口向Kafka发送消息。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring boot与kafka集成的简单实例 - Python技术站