Java分布式学习之Kafka消息队列
什么是Kafka消息队列
Kafka是一种高可用、高性能、分布式的消息队列系统,广泛应用于大数据领域。它可以处理海量数据,并提供实时的数据流处理。Kafka具有可拓展性好、可靠性高、消息传输速度快等优点,是大数据处理中不可或缺的组件。
Kafka的基本概念
Kafka中的重要概念包括:Producer、Consumer、Topic、Broker、Partition等。
- Producer:消息的生产者,即产生消息的客户端。
- Consumer:消息的消费者,即接收并处理消息的客户端。
- Topic:消息的主题,相当于一个消息的类别或者频道。
- Broker:Kafka运行的服务器节点。
- Partition:每个Topic可以分为多个Partition,多个Partition组成一个Topic的完整消息集合。一个Partition只能有一个Producer进行写入,但是每个Partition可以有多个Consumer进行读取。
Kafka的核心API
Kafka提供了两种核心API:Producer API和Consumer API。
Producer API
Producer API提供了两种发送消息的方式:
- 同步发送:消息会一直等待Broker的响应,直到Broker响应成功为止。
- 异步发送:发送消息后不等待Broker的响应,直接返回。通过回调函数、future等方式获取Broker的响应信息。
下面是Java代码示例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
Consumer API
Consumer API提供了两种消息消费的方式:
- 高级消费:开发者可以根据需要手动控制消费。
- 简单消费:Kafka提供的最简单的消息消费方式,自动维护消费offset,消费者只需要提供消息的处理逻辑即可。
下面是Java代码示例:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Kafka的安装与配置
在Ubuntu系统中,可以通过apt-get方式快速安装Kafka,具体步骤如下:
- 安装Java:
sudo apt-get install default-jre
- 下载并解压缩Kafka:
wget https://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xzf kafka_2.13-2.8.0.tgz
cd kafka_2.13-2.8.0
- 启动Kafka:
./bin/kafka-server-start.sh config/server.properties
示例一
在本示例中,我们将演示如何使用Kafka进行简单的消息发送与接收。具体步骤如下:
- 启动Kafka
在控制台运行以下命令:
./bin/kafka-server-start.sh config/server.properties
- 创建Topic
在控制台运行以下命令:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
- 发送消息
编写并运行如下Java代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
producer.send(new ProducerRecord<String, String>("test-topic", "key1", "value1"));
producer.send(new ProducerRecord<String, String>("test-topic", "key2", "value2"));
producer.close();
- 接收消息
编写并运行如下Java代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
- 结果分析
执行发送消息的代码后,我们可以在控制台看到如下的输出:
offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2
可以看到我们成功发送了两条消息。执行接收消息的代码后,我们可以在控制台看到如下的输出:
offset = 0, key = key1, value = value1
offset = 1, key = key2, value = value2
说明我们成功接收了两条消息。
示例二
在本示例中,我们将演示如何使用Kafka进行高级消费。具体步骤如下:
- 启动Kafka
在控制台运行以下命令:
./bin/kafka-server-start.sh config/server.properties
- 创建Topic
在控制台运行以下命令:
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test-topic
- 发送消息
编写并运行如下Java代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 1000; i++)
producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
- 接收消息
编写并运行如下Java代码:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "earliest");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
consumer.commitSync(Collections.singletonMap(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)));
}
}
- 结果分析
执行发送消息的代码后,我们可以在控制台看到如下的输出:
...
offset = 997, key = 997, value = 997
offset = 998, key = 998, value = 998
offset = 999, key = 999, value = 999
可以看到我们成功发送了一千条消息。执行接收消息的代码后,我们可以在控制台看到如下的输出:
...
offset = 3, key = 3, value = 3
offset = 4, key = 4, value = 4
offset = 5, key = 5, value = 5
...
说明我们成功接收并处理了一千条消息,并手动控制了消费offset。
总结
本文简单介绍了Kafka的基本概念、核心API以及安装和配置步骤,并提供了两个示例展示了Kafka的简单消息发送与接收以及高级消费功能。有了这些基础知识,读者可以深入学习Kafka并大胆应用于实际项目中。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Java分布式学习之Kafka消息队列 - Python技术站