下面我来给你详细讲解“SpringBoot Kafka 整合使用及安装教程”的完整攻略。
1. 安装Kafka
Kafka是一个开源的分布式流处理平台,它由Scala和Java编写而成。首先我们需要安装Kafka:
- 访问Kafka官网:http://kafka.apache.org/downloads
- 选择Kafka的版本,推荐使用最新版本
- 解压下载好的文件
- 使用如下命令启动Kafka服务:
bin/kafka-server-start.sh config/server.properties
2. 创建一个Kafka Topic
在使用Kafka前,我们需要创建一个Kafka Topic。
- 在Kafka的安装目录下运行如下命令创建一个名为test的Topic:
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
3. 编写SpringBoot项目代码
- 在你的SpringBoot项目中添加依赖项:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.4.RELEASE</version>
</dependency>
- 创建Kafka的配置文件application.yml,添加如下配置:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: myGroup
auto-offset-reset: earliest
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
- 编写 Kafka 发送消息的代码:
```
@Autowired
private KafkaTemplate
public void sendMessage(String topic, String message) {
ListenableFuture
future.addCallback(new ListenableFutureCallback<>() {
@Override
public void onSuccess(SendResult
logger.info("Producer send message success with topic:{},partition:{},offset:{}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
logger.error("Producer send message error: {}", ex.getMessage());
}
});
}
```
- 编写 Kafka 接收消息的代码:
@KafkaListener(topics = "test")
public void listen(ConsumerRecord<?, ?> record) {
logger.info("topic:{}, groupId:{}, message:{}, offset:{}, partition:{}", record.topic(), record.key(),
record.value(), record.offset(), record.partition());
}
4. 运行示例
- 启动Kafka服务
- 启动SpringBoot应用程序
- 调用发送消息的接口
```
@RestController
public class KafkaController {
@Autowired
private KafkaService kafkaService;
@GetMapping("/send")
public String send() {
kafkaService.sendMessage("test", "hello");
return "success";
}
}
```
- 查看控制台输出的消息
2021-05-27 10:47:42.527 INFO 24760 --- [ kafka-binder-13] com.example.demo.service.KafkaService : Producer send message success with topic:test,partition:0,offset:0
2021-05-27 10:47:48.877 INFO 24760 --- [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] com.example.demo.service.KafkaService : topic:test, groupId:null, message:hello, offset:0, partition:0
以上就是关于“SpringBoot Kafka 整合使用及安装教程”的完整攻略,希望对你有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot Kafka 整合使用及安装教程 - Python技术站