关于Kafka消费者订阅方式

下面我来为您详细讲解关于Kafka消费者订阅方式的完整攻略。

Kafka消费者订阅方式

在 Kafka 中,消费者可以通过不同的方式从主题(Topic)中获取消息,以下是三种常见的订阅方式:

1. 静态订阅方式

使用静态方式订阅主题的消费者需要在代码中显式指定要消费的主题和分区。消费者只能消费指定分区中的消息,无法动态的分配和重新分配分区。

Java 客户端为例进行说明,可以使用以下代码对指定主题、指定分区的消息进行消费:

public void consumeTopic(String topic, int partition) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    TopicPartition tp = new TopicPartition(topic, partition);
    consumer.assign(Arrays.asList(tp));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
        }
    }
    consumer.close();
}

2. 动态订阅方式

使用动态订阅方式的消费者可以根据订阅的主题动态分配和重新分配分区。在这种订阅方式下,消费者会自动加入消费者组(Consumer Group),并与其他消费者共享消息主题中的分区,同时消费者也可以随时退出消费组。

以 Java 客户端为例进行说明,可以使用以下代码实现动态订阅方式:

public void consumeDynamic(String topic) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
        }
    }
    consumer.close();
}

3. 正则表达式订阅方式

使用正则表达式订阅方式的消费者可以通过正则表达式匹配主题的名称获取对应主题中的所有分区的消息。在这种方式下,消费者也会加入消费组并共享分区,也可以随时退出消费组。

以 Java 客户端为例进行说明,可以使用以下代码实现正则表达式订阅方式:

public void consumeRegex(String topicPattern) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Pattern.compile(topicPattern));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
        }
    }
    consumer.close();
}

示例

以下是两个示例,展示了不同订阅方式下的消费者的使用方法。

示例1

在这个示例中,我们使用动态订阅方式消费 topic1 主题中的消息,从自动提交位移改为手动提交位移:

public void consumeDynamicManual() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");
    // 关闭自动提交位移
    props.put("enable.auto.commit", "false");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic1"));

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value());
            }
            // 手动提交位移
            consumer.commitSync();
        }
    } finally {
        consumer.close();
    }
}

示例2

在这个示例中,我们使用正则表达式订阅方式消费 topic* 主题中的消息:

public void consumeRegex() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Pattern.compile("topic.*"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());
        }
    }
    consumer.close();
}

以上就是关于Kafka消费者订阅方式的完整攻略,希望能帮助您更好地理解Kafka消费者订阅的相关知识。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于Kafka消费者订阅方式 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Spring Security自定义认证器的实现代码

    下面是Spring Security自定义认证器的实现的完整攻略,包含了两个示例。 1. 自定义认证器简介 Spring Security是一个强大的安全框架,可以帮助我们实现各种安全功能。其中认证是Spring Security最基本的功能之一,它可以防止未经授权的用户访问受保护的资源,保护应用程序的安全。 Spring Security默认提供了基于用户…

    Java 2023年5月20日
    00
  • Java 编程中十个处理异常的建议

    Java 编程中十个处理异常的建议 对于 Java 程序员来说,异常处理是一项必不可少的技能。如何合理地处理异常,提高代码的健壮性和可维护性,是每个程序员都应该重视的问题。这里提供十个处理异常的建议,帮助大家更好地应对各种异常情况。 1. 避免捕获过于宽泛的异常 Java 中存在许多种异常类型,如 NullPointerException、ArrayInde…

    Java 2023年5月27日
    00
  • IDEA插件开发之环境搭建过程图文详解

    首先,要进行IDEA插件开发,需要搭建相应的开发环境。下面是我准备的完整攻略: 环境准备 Java环境 IDEA插件开发需要Java的支持,所以需要先安装Java环境。如果还没有安装,可以在Java官网上下载对应版本的Java开发包,并按照官方文档进行安装操作。 IntelliJ IDEA安装 下载并安装IntelliJ IDEA开发环境。建议下载最新版本。…

    Java 2023年5月26日
    00
  • Spring Security安全框架之记住我功能

    标题:Spring Security安全框架之记住我功能详解 什么是记住我功能 记住我功能是指,在用户登录成功后,用户的身份认证信息会保持在客户端的cookie中,以便用户下次访问同一站点时不需要再次登录。 Spring Security中如何实现记住我功能 要在Spring Security中实现记住我功能,需要进行以下几个步骤: 1.在spring se…

    Java 2023年6月3日
    00
  • Java 添加Word目录的2种方法示例代码详解

    针对你提出的问题,我来进行详细讲解。 标题 首先,我们要为这篇攻略添加一个合适的标题,比如“Java 添加Word目录的2种方法示例代码详解”。 介绍 在正式讲解之前,我们需要先给读者介绍一下本篇攻略的背景和目的。这里我们可以写一段简短的介绍: 本篇攻略将为大家介绍如何在Java中添加Word目录的两种方法,并提供相应的示例代码进行演示。其中,第一种方法使用…

    Java 2023年5月19日
    00
  • Java使用ScriptEngine动态执行代码(附Java几种动态执行代码比较)

    Java使用ScriptEngine动态执行代码(附Java几种动态执行代码比较) 在Java中,我们有多种方法可以动态执行代码,包括使用ScriptEngine引擎、使用Java Compiler API、使用字节码增强框架等。其中,使用ScriptEngine引擎是最常见的一种方法。 ScriptEngine引擎 ScriptEngine是Java SE…

    Java 2023年5月23日
    00
  • Hibernate save() saveorupdate()的用法第1/2页

    Hibernate save() 和 saveOrUpdate() 方法 save() 和 saveOrUpdate() 方法是 Hibernate 中常用的操作数据的方法之一,两种方法都可以用来保存一个对象到数据库中。它们的不同之处在于在不同的情况下它们的行为表现不同。 save() 方法 当给定一个新的对象时,使用 save() 方法将该对象保存到数据库…

    Java 2023年6月15日
    00
  • Maven setting.xml配置文件详解

    下面是Maven setting.xml配置文件详解的完整攻略。 什么是Maven的setting.xml配置文件? Maven的setting.xml配置文件是Maven构建系统的配置文件之一,它可以对Maven构建过程中的各种参数进行设置,比如Maven的本地仓库路径、代理服务器地址、编译插件、发布仓库等等。设置这些参数可以让我们的Maven构建过程更加…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部