关于Kafka消费者订阅方式

下面我来为您详细讲解关于Kafka消费者订阅方式的完整攻略。

Kafka消费者订阅方式

在 Kafka 中,消费者可以通过不同的方式从主题(Topic)中获取消息,以下是三种常见的订阅方式:

1. 静态订阅方式

使用静态方式订阅主题的消费者需要在代码中显式指定要消费的主题和分区。消费者只能消费指定分区中的消息,无法动态的分配和重新分配分区。

Java 客户端为例进行说明,可以使用以下代码对指定主题、指定分区的消息进行消费:

public void consumeTopic(String topic, int partition) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    TopicPartition tp = new TopicPartition(topic, partition);
    consumer.assign(Arrays.asList(tp));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
        }
    }
    consumer.close();
}

2. 动态订阅方式

使用动态订阅方式的消费者可以根据订阅的主题动态分配和重新分配分区。在这种订阅方式下,消费者会自动加入消费者组(Consumer Group),并与其他消费者共享消息主题中的分区,同时消费者也可以随时退出消费组。

以 Java 客户端为例进行说明,可以使用以下代码实现动态订阅方式:

public void consumeDynamic(String topic) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList(topic));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
        }
    }
    consumer.close();
}

3. 正则表达式订阅方式

使用正则表达式订阅方式的消费者可以通过正则表达式匹配主题的名称获取对应主题中的所有分区的消息。在这种方式下,消费者也会加入消费组并共享分区,也可以随时退出消费组。

以 Java 客户端为例进行说明,可以使用以下代码实现正则表达式订阅方式:

public void consumeRegex(String topicPattern) {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Pattern.compile(topicPattern));
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",
                    record.offset(), record.key(), record.value());
        }
    }
    consumer.close();
}

示例

以下是两个示例,展示了不同订阅方式下的消费者的使用方法。

示例1

在这个示例中,我们使用动态订阅方式消费 topic1 主题中的消息,从自动提交位移改为手动提交位移:

public void consumeDynamicManual() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");
    // 关闭自动提交位移
    props.put("enable.auto.commit", "false");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("topic1"));

    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
                        record.topic(), record.partition(), record.offset(),
                        record.key(), record.value());
            }
            // 手动提交位移
            consumer.commitSync();
        }
    } finally {
        consumer.close();
    }
}

示例2

在这个示例中,我们使用正则表达式订阅方式消费 topic* 主题中的消息:

public void consumeRegex() {
    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("group.id", "test-group");

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Pattern.compile("topic.*"));

    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s, partition = %d, offset = %d, key = %s, value = %s%n",
                    record.topic(), record.partition(), record.offset(),
                    record.key(), record.value());
        }
    }
    consumer.close();
}

以上就是关于Kafka消费者订阅方式的完整攻略,希望能帮助您更好地理解Kafka消费者订阅的相关知识。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:关于Kafka消费者订阅方式 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • java中的前++和后++的区别示例代码详解

    Java中的前++和后++的区别示例代码详解 在Java语言中,++运算符可以表示自增运算符,即对于一个变量,它的值可以通过++运算符来自增1,但是++运算符又可以分为前++和后++两种形式,他们的区别在于运算符的位置。下面我们来详细讲解一下Java中的前++和后++的区别。 前++和后++的区别 前++:先自增,再引用该变量。 后++:先引用该变量,再自增…

    Java 2023年5月23日
    00
  • Java Apache Commons报错“ObjectCreationException”的原因与解决方法

    “ObjectCreationException”是Java的Apache Commons类库中的一个异常,通常由以下原因之一引起: 无效的对象:如果对象无效,则可能会出现此错误。在这种情况下,需要检查对象以解决此问题。 无效的配置:如果配置无效,则可能会出现此错误。在这种情况下,需要检查配置以解决此问题。 以下是两个实例: 例1 如果对象无效,则可以尝试检…

    Java 2023年5月5日
    00
  • 解决netty中spring对象注入失败的问题

    解决Netty中Spring对象注入失败的问题,一般存在两个方面的问题: 在Netty的handler中无法注入Spring的bean; 在Netty的线程中使用Spring的事务管理器会出现异常报错。 为了解决这两个问题,我们需要按照以下步骤进行: 步骤一:引入spring-boot-starter-netty 在Spring Boot项目中,通过添加sp…

    Java 2023年6月16日
    00
  • Java SpringSecurity+JWT实现登录认证

    一、什么是Java Spring Security和JWT? Java SpringSecurity是Spring框架中的一个安全工具,能够提供身份验证、授权、防止csrf攻击等功能; JWT(JSON Web Token)是一种用于身份验证的开放标准(RFC 7519),它使用JSON格式在网络之间安全地传递信息。JWT具有轻量级、开放性、易于使用和便于传…

    Java 2023年5月20日
    00
  • JavaSpringBoot报错“ServerErrorException”的原因和处理方法

    原因 “ServerErrorException” 错误通常是以下原因引起的: 服务器配置问题:如果您的服务器配置存在问题,则可能会出现此错误。在这种情况下,需要检查您的服务器配置并确保它们正确。 服务器资源问题:如果您的服务器资源存在问题,则可能会出现此错误。在这种情况下,需要检查您的服务器资源并确保它们正确。 服务器代码问题:如果您的服务器代码存在问题,…

    Java 2023年5月4日
    00
  • 用java等语言仿360首页拼音输入全模糊搜索和自动换肤

    实现360首页拼音输入全模糊搜索和自动换肤,可以分为以下几步: 1. 获取输入关键词 首先需要获取用户输入的关键词,可以使用HTML中的<input>标签,并添加keydown事件监听器,即当用户输入内容时触发。 <input type="text" id="search-input" placeho…

    Java 2023年6月15日
    00
  • 解决idea使用过程中让你觉得不爽的一些问题(小结)

    解决idea使用过程中让你觉得不爽的一些问题 IntelliJ IDEA 是一款非常强大的 Java 集成开发环境,但是在使用过程中会遇到一些让人不爽的问题。下面是解决这些问题的攻略。 问题一:IntelliJ IDEA 启动慢 解决办法: 删除项目中的 .idea 文件夹,清空缓存 在 IntelliJ IDEA 中,提供了清除缓存的功能,操作步骤是:点击…

    Java 2023年5月20日
    00
  • hibernate-validator改进校验框架validator v0.4使用

    来讲一下“hibernate-validator改进校验框架validator v0.4使用”的完整攻略。 什么是Hibernate-Validator? Hibernate-Validator 是一款校验框架。这个框架的初衷是为了在 JavaBean 层面上提供一套统一、可重用的验证机制,使得我们在对 JavaBean 进行数据验证时能够更加方便、快捷、灵…

    Java 2023年5月20日
    00
合作推广
合作推广
分享本页
返回顶部