Java分布式学习之Kafka消息队列

Java分布式学习之Kafka消息队列

什么是Kafka消息队列

Kafka是一种高可用、高性能、分布式的消息队列系统,广泛应用于大数据领域。它可以处理海量数据,并提供实时的数据流处理。Kafka具有可拓展性好、可靠性高、消息传输速度快等优点,是大数据处理中不可或缺的组件。

Kafka的基本概念

Kafka中的重要概念包括:Producer、Consumer、Topic、Broker、Partition等。

  • Producer:消息的生产者,即产生消息的客户端。
  • Consumer:消息的消费者,即接收并处理消息的客户端。
  • Topic:消息的主题,相当于一个消息的类别或者频道。
  • Broker:Kafka运行的服务器节点。
  • Partition:每个Topic可以分为多个Partition,多个Partition组成一个Topic的完整消息集合。一个Partition只能有一个Producer进行写入,但是每个Partition可以有多个Consumer进行读取。

Kafka的核心API

Kafka提供了两种核心API:Producer API和Consumer API。

Producer API

Producer API提供了两种发送消息的方式:

  • 同步发送:消息会一直等待Broker的响应,直到Broker响应成功为止。
  • 异步发送:发送消息后不等待Broker的响应,直接返回。通过回调函数、future等方式获取Broker的响应信息。

下面是Java代码示例:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();

Consumer API

Consumer API提供了两种消息消费的方式:

  • 高级消费:开发者可以根据需要手动控制消费。
  • 简单消费:Kafka提供的最简单的消息消费方式,自动维护消费offset,消费者只需要提供消息的处理逻辑即可。

下面是Java代码示例:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Kafka的安装与配置

在Ubuntu系统中,可以通过apt-get方式快速安装Kafka,具体步骤如下:

  1. 安装Java:
sudo apt-get install default-jre
  1. 下载并解压缩Kafka:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
  1. 启动Kafka:
./bin/kafka-server-start.sh config/server.properties

示例一

在本示例中,我们将演示如何使用Kafka进行简单的消息发送与接收。具体步骤如下:

  1. 启动Kafka

在控制台运行以下命令:

./bin/kafka-server-start.sh config/server.properties
  1. 创建Topic

在控制台运行以下命令:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
  1. 发送消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>("test-topic", "key1", "value1"));
producer.send(new ProducerRecord<String, String>("test-topic", "key2", "value2"));
producer.close();
  1. 接收消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
  1. 结果分析

执行发送消息的代码后,我们可以在控制台看到如下的输出:

offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2

可以看到我们成功发送了两条消息。执行接收消息的代码后,我们可以在控制台看到如下的输出:

offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2

说明我们成功接收了两条消息。

示例二

在本示例中,我们将演示如何使用Kafka进行高级消费。具体步骤如下:

  1. 启动Kafka

在控制台运行以下命令:

./bin/kafka-server-start.sh config/server.properties
  1. 创建Topic

在控制台运行以下命令:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
  1. 发送消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++)
    producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
  1. 接收消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
    }
}
  1. 结果分析

执行发送消息的代码后,我们可以在控制台看到如下的输出:

...
offset = 997, key = 997, value = 997
offset = 998, key = 998, value = 998
offset = 999, key = 999, value = 999

可以看到我们成功发送了一千条消息。执行接收消息的代码后,我们可以在控制台看到如下的输出:

...
offset = 3, key = 3, value = 3
offset = 4, key = 4, value = 4
offset = 5, key = 5, value = 5
...

说明我们成功接收并处理了一千条消息,并手动控制了消费offset。

总结

本文简单介绍了Kafka的基本概念、核心API以及安装和配置步骤,并提供了两个示例展示了Kafka的简单消息发送与接收以及高级消费功能。有了这些基础知识,读者可以深入学习Kafka并大胆应用于实际项目中。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java分布式学习之Kafka消息队列 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • 源码解读Spring-Integration执行过程

    源码解读Spring-Integration执行过程的完整攻略: 简介 Spring-Integration 是 Spring 基于事件驱动、消息推送的一种框架。它是 Spring 企业级开发的一个扩展模块,用于实现不同系统之间的数据交换。Spring-Integration 同时也是 Spring Boot 的子模块之一。它可以使用各种类型的消息传输协议,…

    Java 2023年5月20日
    00
  • 使用Criteria进行分组求和、排序、模糊查询的实例

    下面我将为你详细讲解使用Criteria进行分组求和、排序、模糊查询的完整攻略。 一、Criteria的概述 Hibernate 中的 Criteria 查询是为了解决 HQL 表达式中所没有解决的灵活的高级查询,也可以免去写 SQL 的烦恼,使用标准的方式,所有的查询条件都封装成一个对象。 Criteria 对象可以通过 Restrictions 的静态方…

    Java 2023年5月20日
    00
  • Java实现递归查询树结构的示例代码

    Java实现递归查询树结构的示例代码的攻略包括以下几个步骤: 定义树结构Node类 首先需要定义一个Node类来存储树节点的相关信息,例如节点id、父节点id、节点名称等。Node类的定义如下: public class Node { private String id; // 节点id private String parentId; // 父节点id p…

    Java 2023年5月23日
    00
  • mybatis plus实体类中字段映射mysql中的json格式方式

    下面是关于如何使用MybatisPlus实体类中字段映射MySQL中JSON格式的完整攻略。 1. 引入依赖 在pom.xml中加入以下依赖: <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter&l…

    Java 2023年5月26日
    00
  • SpringMVC参数传递之基本数据类型和复杂对象说明

    SpringMVC参数传递之基本数据类型和复杂对象说明 在SpringMVC中,参数传递是非常重要的,它可以帮助我们将数据从页面传递到控制器中进行处理。本文将详细介绍SpringMVC中参数传递的两种方式:基本数据类型和复杂对象,并提供两个示例说明。 基本数据类型参数传递 在SpringMVC中,我们可以使用基本数据类型来传递参数。以下是一个简单的示例,它使…

    Java 2023年5月17日
    00
  • Spring Boot 使用 SSE 方式向前端推送数据详解

    在Spring Boot应用程序中,我们可以使用SSE(Server-Sent Events)方式向前端推送数据。SSE是一种基于HTTP协议的轻量级推送技术,它允许服务器向客户端推送数据,而无需客户端发起请求。在本文中,我们将详细讲解如何使用Spring Boot和SSE来实现向前端推送数据。 增加依赖 首先,我们需要在pom.xml文件中增加webflu…

    Java 2023年5月18日
    00
  • 使用JPA传递参数的方法

    使用JPA传递参数的方法有多种,可以通过注解、命名参数以及查询参数的方式来实现。下面我将详细讲解这三种方式。 1. 使用注解传递参数 使用注解传递参数的方式需要在SQL语句中使用占位符,同时在代码中使用@Param注解来将参数与占位符对应起来。 例如,我们需要查询某个用户的信息,并且需要使用到用户的id和姓名两个参数。SQL语句可以这样写: SELECT *…

    Java 2023年5月20日
    00
  • 微信小程序实现多选框全选与反全选及购物车中删除选中的商品功能

    下面我将为你详细讲解“微信小程序实现多选框全选与反全选及购物车中删除选中的商品功能”的完整攻略。 实现多选框全选与反全选 HTML结构 首先,在购物车页面的HTML结构中,给每一个商品前面加上一个多选框。例如: <view class="cart-item"> <checkbox class="checkbox…

    Java 2023年5月23日
    00
合作推广
合作推广
分享本页
返回顶部