下面是Spring Boot配置双Kafka全过程的攻略:
1. 添加Kafka依赖
在pom.xml文件中添加以下Kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.7.1</version>
</dependency>
2. 配置Kafka
2.1 配置单个Kafka
在application.yml文件中添加以下Kafka配置:
spring.kafka.bootstrap-servers: kafka1:9092
spring.kafka.consumer.group-id: my-group
spring.kafka.producer.retries: 0
其中,spring.kafka.bootstrap-servers
指定了Kafka地址,spring.kafka.consumer.group-id
指定了消费者组的ID,spring.kafka.producer.retries
指定了消息发送失败时的重试次数。
2.2 配置双Kafka
如果需要同时配置两个Kafka,则可以添加一个新的Kafka配置文件,例如:
spring:
kafka:
consumer:
bootstrap-servers: kafka1:9092
group-id: my-group1
producer:
bootstrap-servers: kafka2:9092
其中,spring.kafka.consumer
和spring.kafka.producer
均包含对应的属性配置,bootstrap-servers
指定了Kafka地址,group-id
指定了消费者组的ID。
3. 创建producer和consumer
3.1 创建单个Kafka producer和consumer
可以通过@Autowired
注解来自动注入KafkaTemplate
和KafkaListenerContainerFactory
,例如:
@Service
public class KafkaService {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "my-topic")
public void listen(String message) {
System.out.println("Received message: " + message);
}
public void sendMessage(String message) {
kafkaTemplate.send("my-topic", message);
}
}
其中,KafkaTemplate
用于发送消息,@KafkaListener
注解用于监听消息,String
指定了消息的key和value类型。
3.2 创建双Kafka producer和consumer
对于使用了双Kafka的项目,需要通过@Qualifier
注解指定对应的KafkaTemplate和KafkaListenerContainerFactory,例如:
@Service
public class KafkaService {
@Autowired
@Qualifier("kafkaTemplate1")
private KafkaTemplate<String, String> kafkaTemplate1;
@Autowired
@Qualifier("kafkaListenerContainerFactory2")
private KafkaListenerContainerFactory<?> kafkaListenerContainerFactory2;
@KafkaListener(topics = "my-topic1")
public void listen1(String message) {
System.out.println("Received message from kafka1: " + message);
}
@KafkaListener(topics = "my-topic2", containerFactory = "kafkaListenerContainerFactory2")
public void listen2(String message) {
System.out.println("Received message from kafka2: " + message);
}
public void sendMessageToKafka1(String message) {
kafkaTemplate1.send("my-topic1", message);
}
public void sendMessageToKafka2(String message) {
kafkaTemplate2.send("my-topic2", message);
}
}
其中,@Qualifier
注解用于指定对应的KafkaTemplate和KafkaListenerContainerFactory,listen1
方法使用了默认的KafkaListenerContainerFactory,而listen2
方法使用了kafkaListenerContainerFactory2
。
4. 示例
4.1 示例1:使用单个Kafka
发送消息:
@Autowired
private KafkaService kafkaService;
@GetMapping("/send")
public String sendMessage() {
kafkaService.sendMessage("hello world");
return "message sent";
}
接收消息:
@Component
public class KafkaReceiver {
@KafkaListener(topics = "my-topic")
public void receiveMessage(String message) {
System.out.println("Received message: " + message);
}
}
4.2 示例2:使用双Kafka
发送消息:
@Autowired
private KafkaService kafkaService;
@GetMapping("/send1")
public String sendMessageToKafka1() {
kafkaService.sendMessageToKafka1("hello world from kafka1");
return "message sent to kafka1";
}
@GetMapping("/send2")
public String sendMessageToKafka2() {
kafkaService.sendMessageToKafka2("hello world from kafka2");
return "message sent to kafka2";
}
接收消息:
@Component
public class KafkaReceiver {
@KafkaListener(topics = "my-topic1")
public void receiveMessageFromKafka1(String message) {
System.out.println("Received message from kafka1: " + message);
}
@KafkaListener(topics = "my-topic2", containerFactory = "kafkaListenerContainerFactory2")
public void receiveMessageFromKafka2(String message) {
System.out.println("Received message from kafka2: " + message);
}
}
以上就是Spring Boot配置双Kafka全过程的攻略,希望能对你有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:springboot之配置双kafka全过程 - Python技术站