Java分布式学习之Kafka消息队列

Java分布式学习之Kafka消息队列

什么是Kafka消息队列

Kafka是一种高可用、高性能、分布式的消息队列系统,广泛应用于大数据领域。它可以处理海量数据,并提供实时的数据流处理。Kafka具有可拓展性好、可靠性高、消息传输速度快等优点,是大数据处理中不可或缺的组件。

Kafka的基本概念

Kafka中的重要概念包括:Producer、Consumer、Topic、Broker、Partition等。

  • Producer:消息的生产者,即产生消息的客户端。
  • Consumer:消息的消费者,即接收并处理消息的客户端。
  • Topic:消息的主题,相当于一个消息的类别或者频道。
  • Broker:Kafka运行的服务器节点。
  • Partition:每个Topic可以分为多个Partition,多个Partition组成一个Topic的完整消息集合。一个Partition只能有一个Producer进行写入,但是每个Partition可以有多个Consumer进行读取。

Kafka的核心API

Kafka提供了两种核心API:Producer API和Consumer API。

Producer API

Producer API提供了两种发送消息的方式:

  • 同步发送:消息会一直等待Broker的响应,直到Broker响应成功为止。
  • 异步发送:发送消息后不等待Broker的响应,直接返回。通过回调函数、future等方式获取Broker的响应信息。

下面是Java代码示例:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++)
    producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();

Consumer API

Consumer API提供了两种消息消费的方式:

  • 高级消费:开发者可以根据需要手动控制消费。
  • 简单消费:Kafka提供的最简单的消息消费方式,自动维护消费offset,消费者只需要提供消息的处理逻辑即可。

下面是Java代码示例:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Kafka的安装与配置

在Ubuntu系统中,可以通过apt-get方式快速安装Kafka,具体步骤如下:

  1. 安装Java:
sudo apt-get install default-jre
  1. 下载并解压缩Kafka:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
  1. 启动Kafka:
./bin/kafka-server-start.sh config/server.properties

示例一

在本示例中,我们将演示如何使用Kafka进行简单的消息发送与接收。具体步骤如下:

  1. 启动Kafka

在控制台运行以下命令:

./bin/kafka-server-start.sh config/server.properties
  1. 创建Topic

在控制台运行以下命令:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
  1. 发送消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>("test-topic", "key1", "value1"));
producer.send(new ProducerRecord<String, String>("test-topic", "key2", "value2"));
producer.close();
  1. 接收消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
  1. 结果分析

执行发送消息的代码后,我们可以在控制台看到如下的输出:

offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2

可以看到我们成功发送了两条消息。执行接收消息的代码后,我们可以在控制台看到如下的输出:

offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2

说明我们成功接收了两条消息。

示例二

在本示例中,我们将演示如何使用Kafka进行高级消费。具体步骤如下:

  1. 启动Kafka

在控制台运行以下命令:

./bin/kafka-server-start.sh config/server.properties
  1. 创建Topic

在控制台运行以下命令:

./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
  1. 发送消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++)
    producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
  1. 接收消息

编写并运行如下Java代码:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
    }
}
  1. 结果分析

执行发送消息的代码后,我们可以在控制台看到如下的输出:

...
offset = 997, key = 997, value = 997
offset = 998, key = 998, value = 998
offset = 999, key = 999, value = 999

可以看到我们成功发送了一千条消息。执行接收消息的代码后,我们可以在控制台看到如下的输出:

...
offset = 3, key = 3, value = 3
offset = 4, key = 4, value = 4
offset = 5, key = 5, value = 5
...

说明我们成功接收并处理了一千条消息,并手动控制了消费offset。

总结

本文简单介绍了Kafka的基本概念、核心API以及安装和配置步骤,并提供了两个示例展示了Kafka的简单消息发送与接收以及高级消费功能。有了这些基础知识,读者可以深入学习Kafka并大胆应用于实际项目中。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java分布式学习之Kafka消息队列 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • scratch怎么做太阳地球月球转动演示? 地球月球太阳三维动画的做法

    做太阳、地球、月球运动的动画可以使用Scratch软件来实现。下面是这个动画的做法: 创建地球 首先,我们需要创建地球的精灵(Sprite)。点击 Scratch 软件界面左下角的“角色”图标,选择“新角色”。在弹出的对话框中,可以选择一个预定义形状作为地球的外观。点击“确定”后,可以进入地球的编辑界面,在这里可以为地球添加要显示的图像或修改其它属性。 给地…

    Java 2023年5月26日
    00
  • Java程序优化的作用是什么?

    Java程序优化的作用 Java程序优化是指在保持程序功能不变的前提下,通过优化代码结构、算法、资源利用等方面的手段提升程序的性能和效率。Java程序优化的作用体现在以下几个方面: 提升用户体验:优化程序性能可以减少用户等待时间,提高程序响应速度,从而提升用户体验。 节省资源开销:优化程序可以减少资源消耗,减少运行成本,从而提高整个系统的利用率。 提升系统稳…

    Java 2023年5月11日
    00
  • Java基础教程之整数运算

    Java基础教程之整数运算攻略 Java是一种强类型语言,其中包含了整数类型及其运算操作。本文将详细讲解Java基础教程中的整数运算,包括基本概念、运算规则和示例说明。 基本概念 Java中的整数类型主要有四种:byte、short、int和long,对应的存储空间分别为1、2、4和8个字节。整数运算包括加、减、乘、除和取模等操作。 运算规则 Java中的整…

    Java 2023年5月26日
    00
  • 微信小程序微信登录的实现方法详解(JAVA后台)

    下面是详细的攻略: 背景介绍 微信小程序微信登录是指用户可以通过微信账号快速登录小程序,无需再次注册账号。实现微信登录的关键在于后台服务器实现微信的登录认证功能。本文将详细讲解如何在Java后台实现微信登录的功能。 实现方法 实现微信登录功能的具体步骤如下: 1.前端页面添加微信登录按钮 <button type="primary"…

    Java 2023年5月23日
    00
  • 详解springMVC两种方式实现多文件上传及效率比较

    详解 Spring MVC 两种方式实现多文件上传及效率比较 本文将详细讲解 Spring MVC 两种不同的实现多文件上传的方式,并对其效率进行比较。示例代码为基于 Maven 的 Spring MVC 项目。 前置条件 JDK 1.8 或以上版本 Maven Servlet 3.0 或以上版本 Spring MVC 4.3 或以上版本 两种上传方式 Sp…

    Java 2023年6月16日
    00
  • Android实现上传文件到服务器实例详解

    Android实现上传文件到服务器实例详解 前言 文件上传是移动端和服务端常见的互动方式之一。在Android开发中,实现上传文件到服务器通常使用HTTP请求实现,请求方式可以为POST或者PUT。 本文将详细介绍Android实现上传文件到服务器的方法。 HTTP请求格式 在进行文件上传之前,我们需要先了解HTTP请求的格式。在当前的移动开发和Web开发中…

    Java 2023年5月20日
    00
  • Java util concurrent及基本线程原理简介

    Java util concurrent及基本线程原理简介 线程基本概念 线程是操作系统进行任务调度和执行的基本单位,一个进程可以拥有多个线程。 线程是轻量级的,相对于进程来说占用较少的资源。 线程也是并发编程的基石,不同的线程可以同时执行不同的任务,提高了应用程序的并发性。 线程的状态 新建状态 线程是尚未启动的状态,实例化了一个Thread对象,还未调用…

    Java 2023年5月18日
    00
  • 一不小心就让Java开发踩坑的fail-fast是个什么鬼?(推荐)

    一不小心就让Java开发踩坑的fail-fast是个什么鬼? 在Java中,有一种叫做fail-fast的机制,它主要是用于快速发现程序中的错误,并迅速抛出异常。 什么是fail-fast机制? fail-fast机制指的是集合中在进行结构性操作(增删改)时,如果集合的状态发生了变化,那么就立即抛出异常以终止当前操作,这样可以防止对集合的并发修改。 在Jav…

    Java 2023年5月25日
    00
合作推广
合作推广
分享本页
返回顶部