kafka分布式消息系统基本架构及功能详解

Kafka分布式消息系统基本架构及功能详解

Kafka简介

Kafka是一个高性能、可扩展、分布式的消息处理平台。它最初是由Linkedin公司开发的,现在已经成为Apache顶级项目之一。Kafka主要用于消息的发布和订阅过程中的分布式处理,可以处理每秒数百万条消息,非常适合使用在大数据处理、实时流处理、日志收集、用户活动跟踪等场景。

Kafka基本架构

Kafka由多个Broker组成,每个Broker可以是单独的一个节点,也可以是一个集群。每个分区(partition)会被存储在一个Broker上。每个分区都由多个副本(replica)组成,其中一个副本被认为是“领导者”(leader),其他副本是“追随者”(follower),领导者负责读写分区数据,追随者只负责数据的复制。每个分区都有一个唯一的ID号,称为分区标识(partition identifier,简称PID)。

Kafka各主要组件的说明如下:
- Producer:生产者,用于向Kafka中发布消息的应用程序,可以把消息发送到一个特定的topic(主题)。
- Consumer:消费者,用于从Kafka中订阅消息的应用程序。消费者订阅一个或多个topic,并处理由Producer发送的消息。
- Broker:Kafka集群中的一个或多个节点,每个Broker负责存储一个或多个topic的消息。
- Topic:消息的分类标识,每条消息都属于一个topic。
- Partition:消息的分片粒度。一个topic可以分为多个partition,每个partition是一个有序的、不可变的消息序列。每个partition在单个Broker上保存,允许topic中的消息进行水平扩展,并且允许多个消费者并行消费一个topic。
- Offset:消息在partition中的位置,通常作为消费者位移(consumer offset)保存在Zookeeper或者Kafka Broker上,代表消费者读取的消息的位置。

Kafka中的消息发送、存储和消费的过程如下:
1. Producer向Kafka Broker发布消息到指定的topic。
2. Broker接收到消息后,先写入磁盘的日志文件(log segment)中,再通过副本同步机制,把消息复制到其他Broker上的副本中(如果有的话),确保数据的高可靠性和可用性。
3. Consumer订阅一个或多个topic的消息,通过轮询方式从Broker上拉取消息来进行消费。

Kafka的主要功能

Kafka提供了以下几个主要功能:

可靠性

Kafka具有高可用性和可靠性,消息具备持久性特性。消息在发送到Kafka Broker后,首先会被写入一个持久化的磁盘日志文件(log segment)中,之后才会被副本同步到其他Broker上。这样可以确保在出现硬件故障时,消息不会丢失。

水平扩展性

Kafka通过使用partition和多副本机制,实现了对水平扩展的支持。一个topic可以分为多个partition,每个partition在单个Broker上保存,允许topic中的消息进行水平扩展。同时,Kafka通过副本同步机制,实现了对多个Broker节点的分布式支持,从而允许在多个节点上部署Kafka集群,实现消息的高可用性和高吞吐量。

可拓展性

Kafka可以处理大量的消息,每秒可处理数百万条消息。同时,Kafka提供了多种client language(Java、Scala、Python、C/C++等)的支持,可以方便地集成到不同的编程语言环境中。

高吞吐量

Kafka具有很高的消息吞吐量,每个分区可以支持每秒数十万条消息。

示例1

假设我们有一个电商网站,需要通过Kafka向黑名单系统(Blacklist System)发布用户黑名单数据。首先,我们需要创建一个topic,用于存储用户黑名单数据。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_blacklist

以上命令创建了一个名为user_blacklist的topic,replication-factor 1表示只保留一个副本,partitions 1表示仅有1个分区。

接下来,我们可以向user_blacklist这个topic中发布用户黑名单数据。假设我们有一张名为user_blacklist.csv的用户黑名单数据文件,每行一个用户Id,我们需要通过producer将该文件中的用户Id都发送到user_blacklist这个topic中。使用如下命令向user_blacklist发消息。

bin/kafka-console-producer.sh --topic user_blacklist --bootstrap-server localhost:9092 < ./user_blacklist.csv

以上命令批量向user_blacklist发送用户黑名单数据。

假设Blacklist System需要消费这些用户黑名单数据,可以通过以下命令监听user_blacklist topic并实时消费数据。

bin/kafka-console-consumer.sh --topic user_blacklist --bootstrap-server localhost:9092 --from-beginning

以上命令从user_blacklist这个topic中消费用户黑名单数据。--from-beginning意味着从最开始的消息开始消费。

示例2

假设我们的Kafka集群有3个Broker,一个topic被划分为6个分区(partition)。我们需要通过producer向指定的topic中发送消息,并在consumer中消费这些消息。

首先,我们需要创建一个名为example_topic的topic,划分为6个partition。

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic example_topic

以上命令创建了一个名为example_topic的topic,replication-factor为3,意味着该topic被划分为3份副本存储在Kafka集群中。partitions为6,意味着该topic被划分为6个分区。

接下来,我们可以向example_topic这个topic中发布消息。以下命令随机向example_topic的6个分区发送消息。

bin/kafka-console-producer.sh --topic example_topic --broker-list localhost:9092 --property "parse.key=true" --property "key.separator=:"

以上命令启动了一个producer,发送的消息格式为KEY:VALUE。--property "parse.key=true"将启用key解析,--property "key.separator=:"指定key-value分隔符为":"。

接下来,我们可以通过以下命令来消费example_topic的消息。

bin/kafka-console-consumer.sh --topic example_topic --bootstrap-server localhost:9092 --from-beginning --property "print.key=true" --property "key.separator=:"

以上命令启动了一个consumer,用于消费example_topic的消息。--property "print.key=true"将输出key-value对中的key,--property "key.separator=:"指定key-value分隔符为":"。

本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:kafka分布式消息系统基本架构及功能详解 - Python技术站

(0)
上一篇 2023年5月20日
下一篇 2023年5月20日

相关文章

  • SpringBoot多数据源的两种实现方式实例

    下面我就为你详细讲解一下“SpringBoot多数据源的两种实现方式实例”的完整攻略。 SpringBoot多数据源的两种实现方式实例 为什么需要多数据源 在实际开发中,我们可能会遇到这样的情况:业务系统需要同时连接多个数据库进行数据操作。此时单数据源的方式已无法满足需求,必须使用多数据源来进行解决。 方案一:使用@Primary注解 1.添加多数据源配置项…

    Java 2023年5月20日
    00
  • Java 编程之IO流资料详细整理

    Java 编程之IO流资料详细整理 什么是 IO 流 输入输出流(IO流)指的是一种提供了对数据流进行读写的机制,是 Java 中用于处理流数据的一种常用方式。在 Java 中,IO 流分为字节流和字符流两种方式,分别处理二进制数据和文本数据。 IO 流的分类 字节流 字节流是 IO 流中最基本的一种,主要用于处理二进制数据。Java 中的字节流有两个基本的…

    Java 2023年5月23日
    00
  • JavaSpringBoot报错“UnsatisfiedDependencyException”的原因和处理方法

    原因 “UnsatisfiedDependencyException” 错误通常是以下原因引起的: 依赖项未找到:如果您的代码中存在依赖项未找到的问题,则可能会出现此错误。在这种情况下,您需要检查您的代码并确保它们正确。 多个 Bean 匹配:如果您的代码中存在多个 Bean 匹配的问题,则可能会出现此错误。在这种情况下,您需要检查您的代码并确保它们正确。 …

    Java 2023年5月4日
    00
  • Java实现创建Zip压缩包并写入文件

    下面是详细讲解Java实现创建Zip压缩包并写入文件的完整攻略: 1. ZipOutputStream Java中可以使用ZipOutputStream类来实现创建Zip压缩包,并写入文件。ZipOutputStream是OutputStream类的子类,因此我们可以将需要进行压缩的文件写入ZipOutputStream,再通过ZipOutputStream…

    Java 2023年5月19日
    00
  • 详解Spring Data JPA中Repository的接口查询方法

    Sure!下面是关于“详解Spring Data JPA中Repository的接口查询方法”的完整攻略: 1、什么是Spring Data JPA Spring Data JPA是Spring上建立的一套基于JPA规范的框架,主要用于简化JPA数据访问层的开发,封装了大量复杂的数据访问操作,同时也保证了极高的数据安全性和性能表现。 2、什么是Reposit…

    Java 2023年5月20日
    00
  • java提取json中某个数组的所有值方法

    下面是Java提取JSON中某个数组的所有值的攻略: 将JSON字符串转换为Java对象 首先,我们需要将JSON字符串转换为Java对象,在Java中可以使用GSON、Jackson等JSON库来完成这个过程。以GSON为例,使用它的fromJson()方法可以将JSON字符串转换为Java对象,示例代码如下: Gson gson = new Gson()…

    Java 2023年5月26日
    00
  • 深入浅出理解Java泛型的使用

    深入浅出理解Java泛型的使用 什么是Java泛型? Java泛型是Java SE 5(J2SE 5.0)版本引入的一项新特性,它可以用于在编译时检测和强制类型检查程序的类型安全性,并提供了在编译时检查类型的优势。 泛型可以被看作是Java的抽象类型,它可以在运行时接受不同类型的参数,提高了代码的复用性和可读性。泛型主要包含以下内容: 类型参数(Type P…

    Java 2023年5月26日
    00
  • Java基础之教你怎么用代码一键生成POJO

    下面是Java基础之教你怎么用代码一键生成POJO的完整攻略。 简介 POJO指的是“普通Java对象”(Plain Old Java Object),它是一种基础的Java类,通常用于存储数据。在实际开发中,我们需要大量地编写POJO,这个过程比较繁琐。因此,我们可以使用一些工具,来快速地生成POJO的代码。本文将介绍一种使用IDEA插件一键生成POJO的…

    Java 2023年5月19日
    00
合作推广
合作推广
分享本页
返回顶部