带你玩转Kafka之初步使用
什么是Kafka?
Kafka是一个由Apache开发的分布式流处理平台。它由多个Broker服务器节点组成,可用于消息发布和订阅,以及处理海量数据流。
安装Kafka
Kafka可在Linux、Windows和Mac等操作系统上运行。可从官方网站https://kafka.apache.org/downloads下载二进制包。
- 解压二进制包并进入解压后的目录
tar zxvf kafka_2.13-2.7.0.tgz
cd kafka_2.13-2.7.0
启动Kafka
- 启动Zookeeper
Kafka依赖Zookeeper进行协调。启动Zookeeper服务器
./bin/zookeeper-server-start.sh config/zookeeper.properties
- 启动Kafka
./bin/kafka-server-start.sh config/server.properties
创建Topic
Topic可视为Kafka中的消息队列,用于存储与特定主题相关联的消息。
- 创建Topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
这将在本地Zookeeper服务器中创建一个名为test的Topic。
发送消息
- 发送消息
使用Kafka生产者发送消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
发送完一条消息后按回车键发送下一条消息。ctrl+c 结束发送。
接收消息
- 接收消息
使用Kafka消费者接收消息
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
上述命令将从Topic test读取消息并将其打印到控制台上。
- 执行示例
以下示例演示了如何使用Python运行Kafka生产者和消费者,一个Python脚本为生产者,另一个Python脚本为消费者。需要使用kafka-python库,可使用pip install kafka-python
安装
生产者示例
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for i in range(10):
data = {'number': i}
producer.send('my-topic', value=data)
代码将向名为my-topic
的Topic发送10个数字
消费者示例
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
print(message.value)
代码将从名为my-topic
的Topic接收消息并打印到控制台上。
通过上述示例,您可以初步了解如何使用Kafka发布和订阅消息。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:带你玩转Kafka之初步使用 - Python技术站