Kafka中消息队列的两种模式讲解
Apache Kafka是一个开源的分布式流处理平台,其主要功能是异步处理、发布和订阅消息。在Kafka中,消息队列的模式分为两种:点对点模式和发布/订阅模式。
点对点模式
点对点模式通常用于一个消息只能被一个消费者消费的场景,即一条消息只会被消费一次。这种模式中,消息被发送到Kafka中的一个队列中,在队列中等待消费者来消费。
示例代码如下:
// 生产者
// 创建Producer实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 将消息发送到指定主题和分区
producer.send(new ProducerRecord<>("my-topic", "Hello World"));
producer.close();
// 消费者
// 创建Consumer实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
consumer.close();
上述代码中,我们创建了一个Producer实例来将消息发送到Kafka中的一个主题(my-topic)。然后,我们创建了一个Consumer实例来订阅这个主题,并消费其中的消息。由于消息在消费之后会被删除,因此同一条消息不会被不同的消费者重复消费。
发布/订阅模式
发布/订阅模式通常用于一个消息可以被多个消费者消费的场景。在这种模式下,消息被发送到一个主题中,多个消费者订阅这个主题并接收其中的消息。
示例代码如下:
// 生产者
// 创建Producer实例
Producer<String, String> producer = new KafkaProducer<>(props);
// 将消息发送到指定主题和分区
producer.send(new ProducerRecord<>("my-topic", "Hello World"));
producer.close();
// 消费者
// 创建Consumer实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
consumer1.subscribe(Arrays.asList("my-topic"));
KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer2.subscribe(Arrays.asList("my-topic"));
// 消费消息
while (true) {
ConsumerRecords<String, String> records1 = consumer1.poll(100);
for (ConsumerRecord<String, String> record : records1) {
System.out.printf("consumer1: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
ConsumerRecords<String, String> records2 = consumer2.poll(100);
for (ConsumerRecord<String, String> record : records2) {
System.out.printf("consumer2: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
consumer1.close();
consumer2.close();
上述代码中,我们创建了一个Producer实例来将消息发送到Kafka中的一个主题(my-topic)。然后,我们创建了两个Consumer实例来订阅这个主题,并消费其中的消息。由于消息可以被多个消费者订阅,因此同一条消息可以被多个消费者同时接收。
总结
在Kafka中,消息队列的模式分为点对点模式和发布/订阅模式。点对点模式适合于一个消息只能被一个消费者消费的场景,而发布/订阅模式适合于一个消息可以被多个消费者消费的场景。通过上述示例代码,我们可以更好地理解和使用Kafka的消息队列模式。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Kafka中消息队列的两种模式讲解 - Python技术站