下面我来为您详细讲解关于Kafka消费者订阅方式的完整攻略。
Kafka消费者订阅方式
在 Kafka 中,消费者可以通过不同的方式从主题(Topic)中获取消息,以下是三种常见的订阅方式:
1. 静态订阅方式
使用静态方式订阅主题的消费者需要在代码中显式指定要消费的主题和分区。消费者只能消费指定分区中的消息,无法动态的分配和重新分配分区。
以 Java 客户端为例进行说明,可以使用以下代码对指定主题、指定分区的消息进行消费:
public void consumeTopic(String topic, int partition) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
TopicPartition tp = new TopicPartition(topic, partition);
consumer.assign(Arrays.asList(tp));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
consumer.close();
}
2. 动态订阅方式
使用动态订阅方式的消费者可以根据订阅的主题动态分配和重新分配分区。在这种订阅方式下,消费者会自动加入消费者组(Consumer Group),并与其他消费者共享消息主题中的分区,同时消费者也可以随时退出消费组。
以 Java 客户端为例进行说明,可以使用以下代码实现动态订阅方式:
public void consumeDynamic(String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
consumer.close();
}
3. 正则表达式订阅方式
使用正则表达式订阅方式的消费者可以通过正则表达式匹配主题的名称获取对应主题中的所有分区的消息。在这种方式下,消费者也会加入消费组并共享分区,也可以随时退出消费组。
以 Java 客户端为例进行说明,可以使用以下代码实现正则表达式订阅方式:
public void consumeRegex(String topicPattern) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Pattern.compile(topicPattern));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
}
}
consumer.close();
}
示例
以下是两个示例,展示了不同订阅方式下的消费者的使用方法。
示例1
在这个示例中,我们使用动态订阅方式消费 topic1 主题中的消息,从自动提交位移改为手动提交位移:
public void consumeDynamicManual() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
// 关闭自动提交位移
props.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic1"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
// 手动提交位移
consumer.commitSync();
}
} finally {
consumer.close();
}
}
示例2
在这个示例中,我们使用正则表达式订阅方式消费 topic* 主题中的消息:
public void consumeRegex() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Pattern.compile("topic.*"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
}
consumer.close();
}
以上就是关于Kafka消费者订阅方式的完整攻略,希望能帮助您更好地理解Kafka消费者订阅的相关知识。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于Kafka消费者订阅方式 - Python技术站