下面就为您详细讲解“Springboot整合kafka的示例代码”的完整攻略。
1. Springboot整合kafka的前置知识
在开始编写Springboot整合kafka的示例代码前,需要了解下面几个知识点:
- Apache Kafka的基本概念:Broker、Topic、Partition、Producer、Consumer等。
- Kafka消息的格式化和序列化:Kafka默认使用字节数组作为消息格式,但是我们一般使用JSON或其他格式的消息。因此,我们需要使用序列化和反序列化器来将消息转化为字节数组或Java对象。
- Kafka消息的消费和生产:Kafka消息的消费和生产指的是如何使用Kafka的Producer和Consumer API来进行消息的生产和消费。
2. 创建Springboot项目
首先,我们需要创建一个Springboot项目。可以使用IDEA或Eclipse等工具创建。
3. 引入kafka依赖
在项目的pom.xml文件中引入kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.2.RELEASE</version>
</dependency>
4. Kafka Producer示例代码
我们先来看看如何编写Kafka Producer的示例代码。Kafka Producer用于将消息发送到Broker上。
4.1 创建消息类
我们定义一个消息类Message,包含两个字段,id和content。
public class Message {
private Long id;
private String content;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
@Override
public String toString() {
return "Message{" +
"id=" + id +
", content='" + content + '\'' +
'}';
}
}
4.2 生产者配置
在Springboot项目中,我们可以通过在application.yml文件中配置生产者相关的配置信息:
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
acks: all
- bootstrap-servers:Kafka Broker的地址。
- acks:消息的确认模式。
4.3 编写Producer代码
@Service
public class KafkaProducerService {
private static final String TOPIC = "test-topic";
@Autowired
private KafkaTemplate<String, Message> kafkaTemplate;
public void sendMessage(Message message) {
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send(TOPIC, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onSuccess(SendResult<String, Message> result) {
System.out.println("成功发送消息到:" + TOPIC + ",partition:" + result.getRecordMetadata().partition()
+ ",offset:" + result.getRecordMetadata().offset());
}
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息到:" + TOPIC + " 失败,原因:" + ex.getMessage());
}
});
}
}
- KafkaTemplate用于发送消息。
- ListenableFutureCallback用于处理异步发送的结果。
4.4 Kafka Producer示例代码
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaProducerTest {
@Autowired
private KafkaProducerService producerService;
@Test
public void sendMessage() {
Message message = new Message();
message.setId(1L);
message.setContent("测试消息一");
producerService.sendMessage(message);
}
}
5. Kafka Consumer示例代码
接下来,我们来编写Kafka Consumer的示例代码。Kafka Consumer用于消费Broker上的消息。
5.1 创建消息监听器
我们需要创建一个消息监听器,用于监听指定Topic上的消息。在这里,我们将监听器指定为一个Spring的Component:
@Component
public class KafkaConsumerListener {
private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerListener.class);
@KafkaListener(topics = {"test-topic"})
public void onMessage(ConsumerRecord<String, Message> record) {
logger.info("接收到消息:{}", record.value());
}
}
注意:@KafkaListener注解用于指定监听的Topic。
5.2 消费者配置
在Springboot项目中,我们也可以通过在application.yml文件中配置消费者的相关配置信息:
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "*"
- group-id:消费者的GroupId。
- auto-offset-reset:当没有初始偏移量时,应该从什么地方开始消费。earliest表示从最早的消息开始消费,latest表示从最新的消息开始消费。
- key-deserializer:key的反序列化器。
- value-deserializer:value的反序列化器。
在这里,我们定义消费者的key为String,value为JSON格式的Message消息。
5.3 Kafka Consumer示例代码
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaConsumerTest {
@Autowired
private KafkaProducerService producerService;
@Test
public void sendMessage() throws InterruptedException {
Message message = new Message();
message.setId(1L);
message.setContent("测试消息一");
producerService.sendMessage(message);
Thread.sleep(1000);
}
}
我们在这里发送一条消息到指定的Topic里面,然后等待1秒钟。在这个时间里,消息监听器会接收到我们发送的消息并进行输出。
6. 结论
通过以上示例,我们可以看到如何使用Springboot来整合Kafka。在实现的过程中,需要注意到以下几个点:
- Kafka Producer的发送语法为async-send或sync-send方式,在这里我们使用了异步的方式。
- Kafka Consumer的监听方式需要指定Topic的名称。
- Kafka Consumer需要配置对key和value的反序列化方式。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Springboot整合kafka的示例代码 - Python技术站