下面是关于Spring Boot整合Kafka+注解方式的完整攻略。
1. 引入依赖
首先,我们需要在Maven或Gradle中引入Spring Boot和Kafka的依赖。在Maven中,需要在pom.xml中引入以下依赖:
<!-- Spring Boot -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
2. 配置Kafka
在application.properties中添加如下Kafka配置:
# Kafka
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
其中,bootstrap-servers表示Kafka的地址,group-id表示消费者的分组ID。
3. 消费者
我们可以使用@KafkaListener注解来创建一个消费者。以下是一个消费者示例:
@Service
public class KafkaConsumer {
@KafkaListener(topics = "my-topic", groupId = "my-group")
public void listen(String message) {
System.out.println("Received Message: " + message);
}
}
在此示例中,我们使用@KafkaListener注解来监听my-topic主题,并使用my-group作为消费者分组ID。当消息到达时,listen方法将被调用,并打印出消息内容。
4. 生产者
我们可以使用KafkaTemplate来创建一个生产者。以下是一个生产者示例:
@RestController
public class KafkaController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/publish/{message}")
public void publish(@PathVariable String message) {
kafkaTemplate.send("my-topic", message);
}
}
在此示例中,我们使用KafkaTemplate的send方法将消息发送到my-topic主题。
5. 示例
在本示例中,我们将创建一个简单的应用程序,其中包含一个生产者和两个消费者。
首先,我们需要创建一个Spring Boot应用程序,并引入上文所述的依赖。
然后,我们需要创建一个TopicController:
@RestController
public class TopicController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/topics/{name}")
public void createTopic(@PathVariable String name) {
NewTopic newTopic = new NewTopic(name, 1, (short) 1);
CreateTopicsResult res = adminClient.createTopics(Collections.singleton(newTopic));
res.values().get(name).get();
}
@DeleteMapping("/topics/{name}")
public void deleteTopic(@PathVariable String name) {
DeleteTopicsResult res = adminClient.deleteTopics(Arrays.asList(name));
res.all().get();
}
}
在此示例中,我们使用KafkaAdminClient来动态创建和删除主题。createTopic方法将创建一个名为name的主题,deleteTopic方法将删除名为name的主题。
接下来,我们创建一个生产者:
@RestController
public class ProducerController {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@PostMapping("/produce/{topic}/{message}")
public void produceMessage(@PathVariable String topic, @PathVariable String message) {
kafkaTemplate.send(topic, message);
}
}
在该示例中,我们使用KafkaTemplate的send方法将消息发送到指定的主题。
最后,我们创建两个消费者,如下所示:
@Service
public class FirstConsumer {
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("First Consumer Received Message: " + message);
}
}
@Service
public class SecondConsumer {
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("Second Consumer Received Message: " + message);
}
}
在此示例中,我们使用@KafkaListener注解来监听my-topic主题,并打印出收到的消息内容。
综上所述,这是关于Spring Boot整合Kafka+注解方式的完整攻略,其中包含了Kafka的配置、消费者和生产者的创建以及一个完整的示例。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于spring boot整合kafka+注解方式 - Python技术站