Kafka中消息队列的两种模式讲解

Kafka中消息队列的两种模式讲解

Apache Kafka是一个开源的分布式流处理平台,其主要功能是异步处理、发布和订阅消息。在Kafka中,消息队列的模式分为两种:点对点模式和发布/订阅模式。

点对点模式

点对点模式通常用于一个消息只能被一个消费者消费的场景,即一条消息只会被消费一次。这种模式中,消息被发送到Kafka中的一个队列中,在队列中等待消费者来消费。

示例代码如下:

// 生产者
// 创建Producer实例
Producer<String, String> producer = new KafkaProducer<>(props);

// 将消息发送到指定主题和分区
producer.send(new ProducerRecord<>("my-topic", "Hello World"));

producer.close();

// 消费者
// 创建Consumer实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

// 消费消息
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

consumer.close();

上述代码中,我们创建了一个Producer实例来将消息发送到Kafka中的一个主题(my-topic)。然后,我们创建了一个Consumer实例来订阅这个主题,并消费其中的消息。由于消息在消费之后会被删除,因此同一条消息不会被不同的消费者重复消费。

发布/订阅模式

发布/订阅模式通常用于一个消息可以被多个消费者消费的场景。在这种模式下,消息被发送到一个主题中,多个消费者订阅这个主题并接收其中的消息。

示例代码如下:

// 生产者
// 创建Producer实例
Producer<String, String> producer = new KafkaProducer<>(props);

// 将消息发送到指定主题和分区
producer.send(new ProducerRecord<>("my-topic", "Hello World"));

producer.close();

// 消费者
// 创建Consumer实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer1 = new KafkaConsumer<>(props);
consumer1.subscribe(Arrays.asList("my-topic"));

KafkaConsumer<String, String> consumer2 = new KafkaConsumer<>(props);
consumer2.subscribe(Arrays.asList("my-topic"));

// 消费消息
while (true) {
    ConsumerRecords<String, String> records1 = consumer1.poll(100);
    for (ConsumerRecord<String, String> record : records1) {
        System.out.printf("consumer1: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }

    ConsumerRecords<String, String> records2 = consumer2.poll(100);
    for (ConsumerRecord<String, String> record : records2) {
        System.out.printf("consumer2: offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
    }
}

consumer1.close();
consumer2.close();

上述代码中,我们创建了一个Producer实例来将消息发送到Kafka中的一个主题(my-topic)。然后,我们创建了两个Consumer实例来订阅这个主题,并消费其中的消息。由于消息可以被多个消费者订阅,因此同一条消息可以被多个消费者同时接收。

总结

在Kafka中,消息队列的模式分为点对点模式和发布/订阅模式。点对点模式适合于一个消息只能被一个消费者消费的场景,而发布/订阅模式适合于一个消息可以被多个消费者消费的场景。通过上述示例代码,我们可以更好地理解和使用Kafka的消息队列模式。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Kafka中消息队列的两种模式讲解 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • Mybatis学习笔记之动态SQL揭秘

    Mybatis 是一种流行的持久化框架,其核心是SQL映射文件。动态SQL是Mybatis的重要功能之一,可以帮助开发人员解决复杂的SQL语句拼接问题,从而提高开发速度和可维护性。本文将为您详细讲解Mybatis动态SQL的使用方法和技巧。 什么是动态SQL Mybatis的SQL语句是通过XML文件进行配置的,因此可以灵活地进行动态SQL语句的拼接。动态S…

    Java 2023年6月1日
    00
  • 直接内存的作用是什么?

    直接内存是一种在Java中使用NIO(New Input/Output)时可以使用的内存区域。与Java堆内存不同,直接内存不受Java堆大小的限制,可以使用操作系统的内存,因此可以提高I/O操作的效率。在Java中,可以使用ByteBuffer类来操作直接内存。 以下是直接内存的完整使用攻略: 分配直接内存 在Java中,可以使用ByteBuffer类的a…

    Java 2023年5月12日
    00
  • 从原理聊JVM(一):染色标记和垃圾回收算法

    作者:京东科技 康志兴 1 JVM运行时内存划分 1.1 运行时数据区域 • 方法区 属于共享内存区域,存储已被虚拟机加载的类信息、常量、静态变量、即时编译器编译后的代码等数据。运行时常量池,属于方法区的一部分,用于存放编译期生成的各种字面量和符号引用。 JDK1.8之前,Hotspot虚拟机对方法区的实现叫做永久代,1.8之后改为元空间。二者区别主要在于永…

    Java 2023年4月22日
    00
  • Java Scala偏函数与偏应用函数超详细讲解

    Java Scala偏函数与偏应用函数 前言 本文将详细讲解Java Scala中的偏函数与偏应用函数,供大家参考与学习。 偏函数 Partial Function 偏函数(Partial Function)是指仅对一部分输入定义的函数。偏函数的意义在于,某些情况下,我们并不关心所有的输入内容,只是针对其中的某些数据进行处理。 举个例子,我们需要对整数数组进…

    Java 2023年5月26日
    00
  • 使用MybatisPlus自定义模版中能获取到的信息

    MybatisPlus(简称MP)是一个基于Mybatis的增强工具库,可以大大简化Mybatis开发,提高开发效率。MP支持使用自定义模版来生成代码,通过自定义模版可以快速生成符合公司业务规范的代码,而且MP在模版中提供了很多变量,方便我们在模版中使用。 下面详细讲解在MP自定义模板中能够获取到的信息及使用方法: 1. 可以获取表的元数据信息 在自定义模版…

    Java 2023年6月15日
    00
  • spring事务隔离级别、传播机制以及简单配置方式

    Spring事务管理 Spring提供了强大的事务管理服务,可以方便的实现事务控制,避免了在代码中写大量的底层JDBC事务代码。本篇文章将详细说明Spring事务的隔离级别、传播机制以及简单配置方式。 事务隔离级别 事务隔离级别是数据库保证数据一致性的重要手段,在并发访问数据库时可以防止不同线程对同一个数据产生相互影响的问题。Spring框架支持设置五个事务…

    Java 2023年5月20日
    00
  • Java使用JDBC向MySQL数据库批次插入10W条数据(测试效率)

    Java使用JDBC向MySQL数据库批次插入10W条数据(测试效率)攻略 本文主要介绍如何使用Java和JDBC向MySQL数据库批次插入10万条数据,并且测试其效率。 环境要求 MySQL数据库 Java开发环境 JDBC驱动 实现步骤 1. 安装MySQL数据库和JDBC驱动 如果你已经安装了MySQL数据库,那么可以跳过这一步。 安装JDBC驱动有很…

    Java 2023年5月20日
    00
  • Java创建删除文件和目录的方法(推荐)

    下面是详细的攻略: Java创建删除文件和目录的方法(推荐) 1. 创建文件 在Java中,我们可以使用File类来创建文件。下面是创建文件的步骤: 首先,我们需要创建一个File对象,指向要创建的文件。可以使用文件路径或文件名来创建File对象。 然后,使用createNewFile()方法创建文件。 代码示例: import java.io.*; pub…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部