下面就给您讲解Spring Boot与Kafka集成的示例代码攻略。
1. 引入依赖
首先,在pom.xml
文件中添加Kafka相关的依赖:
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
2. 添加Kafka配置
在application.properties
文件中添加Kafka的配置:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group
3. 发送消息
在代码中发送消息,创建MessageProducer.java
:
@Service
public class MessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:" + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition());
}
});
}
}
4. 接收消息
创建MessageConsumer.java
:
@Service
public class MessageConsumer {
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void receive(ConsumerRecord<?, ?> consumer) {
System.out.println("收到消息:" + consumer.value());
}
}
注解@KafkaListener
可以自动绑定消息消费者。
这样,在启动项目之后,可以通过MessageProducer
发送消息到kafka,并通过MessageConsumer
接收消息。
这里再给您提供一个完整的示例代码,可以仔细研读:
application.properties
:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test_group
spring.kafka.consumer.topic=test
MessageProducer.java
:
@Service
public class MessageProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:" + ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition());
}
});
}
}
MessageConsumer.java
:
@Service
public class MessageConsumer {
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void receive(ConsumerRecord<?, ?> consumer) {
System.out.println("收到消息:" + consumer.value());
}
}
KafkaDemoApplication.java
:
@SpringBootApplication
public class KafkaDemoApplication implements CommandLineRunner {
private static final Logger log = LoggerFactory.getLogger(KafkaDemoApplication.class);
@Autowired
private MessageProducer messageProducer;
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
messageProducer.sendMessage("test", "Hello, Kafka!");
}
}
这是一个简单的例子,您可以根据自己的需求进行修改和扩展。
另外,您还可以参考第二个示例:
1. 引入依赖
<!--kafka-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.1.RELEASE</version>
</dependency>
2. 添加Kafka配置
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test_group1
auto-offset-reset: earliest
enable-auto-commit: true
producer:
retries: 3
batch-size: 4096
buffer-memory: 409600
compression-type: gzip
3. 发送消息
@Service
public class KafkaProducerService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error("发送消息失败:{}", ex.getMessage());
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("发送消息成功:{}-{}-{}", result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(), result.getRecordMetadata().offset());
}
});
}
}
4. 接收消息
@Service
public class KafkaConsumerService {
@KafkaListener(topics = "${spring.kafka.consumer.topic}")
public void handleMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
try {
log.info("消费消息:{}-{}-{}-{}", record.topic(), record.partition(), record.offset(), record.value());
acknowledgment.acknowledge();
} catch (Exception ex) {
log.error("消费消息失败:{}", ex.getMessage());
}
}
}
这个示例比较完整,包含了配置文件、生产者、消费者,可以作为参考。
希望这份攻略可以对您有所帮助,亲身体验一下Spring Boot与Kafka的集成,一定会让您有更深的理解。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring boot 与kafka集成的示例代码 - Python技术站