一文秒懂 kafka HA(高可用)
什么是 Kafka HA?
在 Kafka 中,为了确保数据的可靠性和高可用性,你需要使用多个 Kafka Broker 构建 Kafka 集群。当 Kafka 集群中的某个 Broker 失效时,整个集群依然能够正常运行,数据不会发生丢失或损坏。这就是 Kafka 的高可用性(HA)特性。
如何配置 Kafka HA?
1. 配置 ZooKeeper
ZooKeeper 是 Kafka 集群的协调者,在 Kafka 集群中必不可少。ZooKeeper 用于统一管理和协调 Kafka 集群中所有的 Broker,并提供了状态和配置信息的存储。Kafka Broker 与 ZooKeeper 之间通过配置文件进行连接,在配置文件中配置 ZooKeeper 的 IP 和端口。
2. 安装 Kafka
安装 Kafka 集群有多种方式,这里选用 docker-compose 进行示例。docker-compose.yml 文件如下:
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
restart: always
ports:
- 2181:2181
kafka1:
build:
context: ./kafka1
depends_on:
- zookeeper
restart: always
ports:
- 9092
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka1:9092,OUTSIDE://localhost:9092
KAFKA_LISTENERS: INSIDE://0.0.0.0:9092,OUTSIDE://0.0.0.0:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 1
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
kafka2:
build:
context: ./kafka2
depends_on:
- zookeeper
restart: always
ports:
- 9093
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka2:9093,OUTSIDE://localhost:9093
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:29093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_BROKER_ID: 2
KAFKA_DEFAULT_REPLICATION_FACTOR: 2
在上面的 docker-compose.yml 文件中,我们启动了一个 ZooKeeper 服务,以及两个 Kafka Broker 服务。其中,每个 Kafka Broker 都在内部和外部分别使用 PLAINTEXT 协议监听 0.0.0.0 地址上的不同端口。KAFKA_ADVERTISED_LISTENERS 是为了让 Kafka Broker 之间的通信能正常地建立,KAFKA_DEFAULT_REPLICATION_FACTOR 是为了设置数据备份数量。这里我们设为 2,即每个分区的数据会被保存在两个节点上。
3. 验证 Kafka HA
我们在 Kafka 集群中写入一些数据,然后停止其中一个 Kafka Broker。停掉的 Kafka Broker 上的连接没有同时停掉的话,你会发现 Kafka 集群仍然能够正常处理数据。
我们使用 Node.js 的应用向 Kafka 集群中写入数据,代码如下:
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var KeyedMessage = kafka.KeyedMessage;
var client = new kafka.Client("localhost:2181");
var producer = new Producer(client);
var topic = 'example';
var message = 'hello world';
var keyedMessage = new KeyedMessage('key', message);
producer.on('ready', function () {
producer.send([{topic: topic, messages: keyedMessage}], function (err, result) {
if (err) {
console.error(err);
} else {
console.log(result);
}
process.exit(0);
});
});
producer.on('error', function (err) {
console.error(err);
});
我们写入两条数据到主题example中。在终端中输入下面的命令:
$ docker-compose up -d
$ node producer.js
然后,停掉其中一台 Kafka Broker:
$ docker-compose stop kafka2
再次运行上述 Node.js 的应用向 Kafka 集群写入数据:
$ node producer.js
你会发现数据正常被写入到 Kafka 集群中。
总结
Kafka HA 配置的核心是设置 Broker 的备份数量,确保数据在单个 Broker 失效时不会丢失,同时保证 Kafka 集群仍然能够正常运行。通常使用 ZooKeeper 进行 Broker 的协调和管理,在多个 Broker 中配置相同的备份数量可以达到 HA 目的。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:一文秒懂 kafka HA(高可用) - Python技术站