Kafka分布式消息系统基本架构及功能详解
Kafka简介
Kafka是一个高性能、可扩展、分布式的消息处理平台。它最初是由Linkedin公司开发的,现在已经成为Apache顶级项目之一。Kafka主要用于消息的发布和订阅过程中的分布式处理,可以处理每秒数百万条消息,非常适合使用在大数据处理、实时流处理、日志收集、用户活动跟踪等场景。
Kafka基本架构
Kafka由多个Broker组成,每个Broker可以是单独的一个节点,也可以是一个集群。每个分区(partition)会被存储在一个Broker上。每个分区都由多个副本(replica)组成,其中一个副本被认为是“领导者”(leader),其他副本是“追随者”(follower),领导者负责读写分区数据,追随者只负责数据的复制。每个分区都有一个唯一的ID号,称为分区标识(partition identifier,简称PID)。
Kafka各主要组件的说明如下:
- Producer:生产者,用于向Kafka中发布消息的应用程序,可以把消息发送到一个特定的topic(主题)。
- Consumer:消费者,用于从Kafka中订阅消息的应用程序。消费者订阅一个或多个topic,并处理由Producer发送的消息。
- Broker:Kafka集群中的一个或多个节点,每个Broker负责存储一个或多个topic的消息。
- Topic:消息的分类标识,每条消息都属于一个topic。
- Partition:消息的分片粒度。一个topic可以分为多个partition,每个partition是一个有序的、不可变的消息序列。每个partition在单个Broker上保存,允许topic中的消息进行水平扩展,并且允许多个消费者并行消费一个topic。
- Offset:消息在partition中的位置,通常作为消费者位移(consumer offset)保存在Zookeeper或者Kafka Broker上,代表消费者读取的消息的位置。
Kafka中的消息发送、存储和消费的过程如下:
1. Producer向Kafka Broker发布消息到指定的topic。
2. Broker接收到消息后,先写入磁盘的日志文件(log segment)中,再通过副本同步机制,把消息复制到其他Broker上的副本中(如果有的话),确保数据的高可靠性和可用性。
3. Consumer订阅一个或多个topic的消息,通过轮询方式从Broker上拉取消息来进行消费。
Kafka的主要功能
Kafka提供了以下几个主要功能:
可靠性
Kafka具有高可用性和可靠性,消息具备持久性特性。消息在发送到Kafka Broker后,首先会被写入一个持久化的磁盘日志文件(log segment)中,之后才会被副本同步到其他Broker上。这样可以确保在出现硬件故障时,消息不会丢失。
水平扩展性
Kafka通过使用partition和多副本机制,实现了对水平扩展的支持。一个topic可以分为多个partition,每个partition在单个Broker上保存,允许topic中的消息进行水平扩展。同时,Kafka通过副本同步机制,实现了对多个Broker节点的分布式支持,从而允许在多个节点上部署Kafka集群,实现消息的高可用性和高吞吐量。
可拓展性
Kafka可以处理大量的消息,每秒可处理数百万条消息。同时,Kafka提供了多种client language(Java、Scala、Python、C/C++等)的支持,可以方便地集成到不同的编程语言环境中。
高吞吐量
Kafka具有很高的消息吞吐量,每个分区可以支持每秒数十万条消息。
示例1
假设我们有一个电商网站,需要通过Kafka向黑名单系统(Blacklist System)发布用户黑名单数据。首先,我们需要创建一个topic,用于存储用户黑名单数据。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_blacklist
以上命令创建了一个名为user_blacklist的topic,replication-factor 1表示只保留一个副本,partitions 1表示仅有1个分区。
接下来,我们可以向user_blacklist这个topic中发布用户黑名单数据。假设我们有一张名为user_blacklist.csv的用户黑名单数据文件,每行一个用户Id,我们需要通过producer将该文件中的用户Id都发送到user_blacklist这个topic中。使用如下命令向user_blacklist发消息。
bin/kafka-console-producer.sh --topic user_blacklist --bootstrap-server localhost:9092 < ./user_blacklist.csv
以上命令批量向user_blacklist发送用户黑名单数据。
假设Blacklist System需要消费这些用户黑名单数据,可以通过以下命令监听user_blacklist topic并实时消费数据。
bin/kafka-console-consumer.sh --topic user_blacklist --bootstrap-server localhost:9092 --from-beginning
以上命令从user_blacklist这个topic中消费用户黑名单数据。--from-beginning意味着从最开始的消息开始消费。
示例2
假设我们的Kafka集群有3个Broker,一个topic被划分为6个分区(partition)。我们需要通过producer向指定的topic中发送消息,并在consumer中消费这些消息。
首先,我们需要创建一个名为example_topic的topic,划分为6个partition。
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic example_topic
以上命令创建了一个名为example_topic的topic,replication-factor为3,意味着该topic被划分为3份副本存储在Kafka集群中。partitions为6,意味着该topic被划分为6个分区。
接下来,我们可以向example_topic这个topic中发布消息。以下命令随机向example_topic的6个分区发送消息。
bin/kafka-console-producer.sh --topic example_topic --broker-list localhost:9092 --property "parse.key=true" --property "key.separator=:"
以上命令启动了一个producer,发送的消息格式为KEY:VALUE。--property "parse.key=true"将启用key解析,--property "key.separator=:"指定key-value分隔符为":"。
接下来,我们可以通过以下命令来消费example_topic的消息。
bin/kafka-console-consumer.sh --topic example_topic --bootstrap-server localhost:9092 --from-beginning --property "print.key=true" --property "key.separator=:"
以上命令启动了一个consumer,用于消费example_topic的消息。--property "print.key=true"将输出key-value对中的key,--property "key.separator=:"指定key-value分隔符为":"。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:kafka分布式消息系统基本架构及功能详解 - Python技术站