下面我来详细讲解SpringBoot集成Kafka配置工具类的详细代码。
1. 配置maven依赖
首先,我们需要在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.5.5</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.0</version>
</dependency>
这些依赖将让我们能够在SpringBoot项目中使用Kafka。
2. 编写Kafka配置类
我们需要创建一个用于配置Kafka的类。这个类需要加上@Configuration注解,表示这个类是一个配置类,并且需要加上@EnableKafka注解,启用Kafka功能。同时,我们还需要设置Kafka的一些配置,如bootstrap.servers、key.serializer、value.serializer等。
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
3. 发送数据到Kafka
我们可以在代码中使用KafkaTemplate发送数据到Kafka。在发送数据时,我们需要指定topic和发送的消息内容。示例代码如下:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
在实际使用中,我们可以在controller层中调用sendMessage方法发送消息。
4. 接收Kafka消息
我们需要创建一个用于接收Kafka消息的类。这个类需要加上@Component注解,表示这是一个组件。同时,我们还需要加上@KafkaListener注解,指定需要监听的topic。
@Component
public class KafkaReceiver {
@KafkaListener(topics = "test_topic")
public void receiveMessage(String message) {
// 处理消息
}
}
在实际使用中,当有消息到达指定的topic时,receiveMessage方法会被自动调用。
示例1:发送消息到Kafka
@RestController
public class TestController {
@Autowired
private KafkaSender kafkaSender;
@GetMapping("/send")
public String sendToKafka(String message) {
kafkaSender.sendMessage("test_topic", message);
return "ok";
}
}
这个controller方法接收一个字符串参数message,然后调用kafkaSender的sendMessage方法将消息发送到test_topic这个topic中。
示例2:从Kafka接收消息
@Component
public class KafkaReceiver {
@KafkaListener(topics = "test_topic")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
这个组件会监听test_topic这个topic,当有消息到达时,receiveMessage方法会被自动调用,并输出接收到的消息内容。
这就是SpringBoot集成Kafka配置工具类的完整攻略,希望对你有所帮助!
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:SpringBoot集成Kafka 配置工具类的详细代码 - Python技术站