下面我来为您详细介绍Python中使用Kafka多线程消费者和手动提交消息的方法。
准备工作
在开始编写代码前,需要确保已经安装了Python和Kafka Python包。可以使用以下命令进行安装:
pip install kafka-python
实现过程
首先,我们需要创建一个Kafka topic,并往里面发送一些消息,以便后续消费。在本例中,我们创建了名为“test”的topic,发送了10条消息。
多线程消费者
以下是使用多线程消费者消费Kafka消息的示例代码:
from kafka import KafkaConsumer
import threading
class ConsumerThread(threading.Thread):
def __init__(self, bootstrap_servers, group_id, topic):
threading.Thread.__init__(self)
self.kafka_consumer = KafkaConsumer(
topic,
group_id=group_id,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest', # 从最早的offset开始消费
consumer_timeout_ms=1000, # 超时时间为1秒
enable_auto_commit=False # 关闭自动提交
)
def run(self):
try:
for message in self.kafka_consumer:
print("current thread is {}, message value is {}".format(threading.current_thread().name, message.value))
self.kafka_consumer.commit() # 手动提交offset
except Exception as e:
print("error:", e)
finally:
self.kafka_consumer.close()
bootstrap_servers = ['localhost:9092']
group_id = 'test-group'
topic = 'test'
thread_num = 5
threads = []
for i in range(thread_num):
threads.append(ConsumerThread(bootstrap_servers, group_id, topic))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
代码解释:
- 创建一个Kafka消费者类
KafkaConsumer
,传入参数auto_offset_reset='earliest'
,表示从最早的offset开始消费;enable_auto_commit=False
,表示关闭自动提交offset。 - 在
run()
方法中,使用for循环来遍历消息并打印出每条消息的内容。最后在循环外手动提交offset。 - 创建多个线程对象,并调用
start()
方法开启线程。最后再依次调用join()
方法等待线程执行完毕。
手动提交实例
以下是手动提交Kafka offset实例的示例代码:
from kafka import KafkaConsumer
bootstrap_servers = ['localhost:9092']
group_id = 'test-group'
topic = 'test'
consumer = KafkaConsumer(
topic,
group_id=group_id,
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest', # 从最早的offset开始消费
consumer_timeout_ms=1000, # 超时时间为1秒
enable_auto_commit=False # 关闭自动提交
)
try:
for message in consumer:
print(message.value)
consumer.commit() # 手动提交offset
except Exception as e:
print("error:", e)
finally:
consumer.close()
代码解释:
- 创建一个Kafka消费者类
KafkaConsumer
,传入参数auto_offset_reset='earliest'
,表示从最早的offset开始消费;enable_auto_commit=False
,表示关闭自动提交offset。 - 使用for循环来遍历消息并打印出每条消息的内容。最后在循环外手动提交offset。
- 在异常处理及最后关闭消费者连接。
这样就可以使用Python消费Kafka中的消息,且可以控制offset的提交,实现了更为精细化的消费控制。
以上是Python中使用Kafka多线程消费者和手动提交消息的方法的完整实例教程,希望能对您有所帮助。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python kafka 多线程消费者&手动提交实例 - Python技术站