当我们使用 Apache Kafka 作为消息中心时,需要了解它的架构原理,以便更好地在应用程序中使用它。
Kafka 架构
Kafka 是一个分布式发布订阅消息系统。它的主要组件包括:
- Broker - 处理传入和传出的消息并维护消息的存储
- Topic - 是发布和订阅消息的名称
- Partition - 一个主题可能被分成多个分区。每个分区都是一个有序的,不可变的消息序列,它们被存储在单独的 Broker 上。
- Producer - 生成消息并将它们发送到主题的一个或多个 Partition 中
- Consumer - 订阅一个或多个主题,并从分区中读取消息
Kafka 使用 Zookeeper 维护集群中 Broker,Partition 和副本的元数据,以及 Consumer Group 的信息。
Kafka 读写流程
Kafka 支持高吞吐量,低连接数的读写流程。
写入流程
- Producer 将消息发送到 Broker 或多个 Broker 中。
- Broker 将消息存储在 Partition 中,可保证分区内消息的顺序性。
- 生产者确认消息已经被追加到 Partition,此时消息已被成功添加到 Broker 中。
读取流程
- Consumer Group 中的每个 Consumer 读取一个 Partition 中的一系列连续消息,并把消息保存在本地 Cache 中。
- Consumer 向 Broker 发送“确认”消息,告诉 Broker 消息已经被处理。
- Consumer 轮询 Kafka Topic,以便获取新的消息。
示例说明
生产者示例
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("test-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
以上代码将 100 条字符串消息发送到 test-topic 主题的分区中。
消费者示例
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "test-group");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
以上代码创建了一个 Kafka Consumer,它订阅了 test-topic 主题,并读取 topic 数据的最早消息并输出到控制台。
通过上面的示例,我们可以清楚地了解 Apache Kafka 的架构原理和使用方法,帮助我们更好地使用 Kafka 做消息处理和数据处理。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:深入解析kafka 架构原理 - Python技术站