以下是关于Kafka生产者和消费者JavaAPI的示例代码的完整攻略。
Kafka
Kafka是一个分布式流处理平台,主要由以下组件构成:
- 生产者(Producer)
- 消费者(Consumer)
- 主题(Topic)
- 分区(Partition)
- 偏移量(Offset)
- Broker
- ZooKeeper
Kafka的生产者和消费者JavaAPI提供了开发人员构建基于Kafka的应用程序的能力。下面将介绍如何使用JavaAPI编写Kafka生产者和消费者示例代码。
Kafka生产者JavaAPI示例代码
Kafka的生产者JavaAPI主要包括以下主要类:
- Producer:生产者
- ProducerRecord:生产者纪录
以下是一个演示如何使用JavaAPI编写一个Kafka生产者示例代码的简单示例:
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) throws Exception {
String topicName = "my-topic";
String key = "key1";
String value = "value1";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topicName, key, value);
producer.send(record);
producer.close();
}
}
以上示例做了以下操作:
- 设置了一个名为“my-topic”的主题。
- 设置了一个名为“key1”的键和一个名为“value1”的值。
- 创建了一个Producer对象,它采用前面配置的属性创建。
- 创建了一个ProducerRecord对象,并将键和值设置为前面定义的键和值。
- Producer发送了纪录到Kafka服务器上。
Kafka消费者JavaAPI示例代码
Kafka的消费者JavaAPI主要包括以下主要类:
- Consumer:消费者
- ConsumerRecord:消费者纪录
- ConsumerRecords:消费者纪录集合
- KafkaConsumer:Kafka消费者
以下是一个演示如何使用JavaAPI编写一个Kafka消费者示例代码的简单示例:
import org.apache.kafka.clients.consumer.*;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) throws Exception {
String topicName = "my-topic";
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record Key:" + record.key());
System.out.println("Record value:" + record.value());
System.out.println("Record partition:" + record.partition());
System.out.println("Record offset:" + record.offset());
}
}
}
}
以上示例做了以下操作:
- 设置了一个名为“my-topic”的主题。
- 创建了一个Kafka消费者对象。
- 订阅了Kafka服务器上的“my-topic”主题。
- 从Kafka服务器上拉取纪录,并将其输出到控制台上。
以上示例代码提供了一个简单的基础,一旦你掌握了这个基础,你就可以玩转更复杂的场景和用例。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:kafka生产者和消费者的javaAPI的示例代码 - Python技术站