下面我将为您详细讲解spring boot整合spring-kafka实现发送接收消息的攻略。
一、集成Spring-Kafka依赖
在pom.xml文件中添加spring-kafka的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
二、编写Producer
在spring boot应用中,我们需要编写一个Producer类来发送消息。代码示例如下:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
在上面的代码中,我们使用@Autowired注解将KafkaTemplate注入到生产者类中,然后编写sendMessage方法来发送消息。
三、编写Consumer
接下来,我们需要编写一个Consumer类来接收消息。代码示例如下:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic}")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
在上面的代码中,我们使用@KafkaListener注解来监听指定的主题,一旦有消息到达,就会调用listen方法来处理消息。
四、配置Kafka
在application.properties文件中添加以下Kafka配置:
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=my-group
kafka.topic=my-topic
在上面的代码中,我们配置了Kafka的地址、消费者组和主题。
五、使用示例
假设我们现在要发送一条消息,代码示例如下:
@Autowired
private KafkaProducer kafkaProducer;
...
kafkaProducer.sendMessage("my-topic", "Hello, world!");
假设我们现在要接收来自“my-topic”主题的消息,代码示例如下:
@Autowired
private KafkaConsumer kafkaConsumer;
...
// wait for a while to receive the message
Thread.sleep(5000);
六、完整实例代码
在最后,我提供完整的Spring-Boot整合Spring-Kafka的示例代码:
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@SpringBootApplication
public class SpringKafkaDemoApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context = SpringApplication.run(SpringKafkaDemoApplication.class, args);
// send a message
KafkaProducer kafkaProducer = context.getBean(KafkaProducer.class);
kafkaProducer.sendMessage("my-topic", "Hello, world!");
// wait for a while to receive the message
Thread.sleep(5000);
context.close();
}
@Component
public static class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
@Component
public static class KafkaConsumer {
@KafkaListener(topics = "${kafka.topic}")
public void listen(String message) {
System.out.println("Received message: " + message);
}
}
}
希望以上内容能够对您有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:spring boot整合spring-kafka实现发送接收消息实例代码 - Python技术站