【发布时间】:2023-04-05 13:46:01
【问题描述】:
我正在根据主题可能具有的分区数量动态创建 kafka 消费者。 (使用KafkaConsumer形式的Kafka-Python)
创建消费者后,我使用 ThreadPoolExecuter 启动线程,开始监听这些消费者分区上的特定主题
注意:整个代码都属于一个烧瓶 API 端点。目标是监听 REST 调用,向生产者发送消息,然后监听消费者的响应,然后将响应返回给 REST 调用
# function that listens to consumer messages in each thread
def _get_me_response(consumer_id, consumer):
for message in consumer:
message = message.value
break
consumer.commit()
return consumer_id, message
# ThreadPoolExecuter to start threads
with ThreadPoolExecutor(max_workers=len(consumers)) as executor:
futures = []
for consumer_id, consumer in consumers.items():
futures.append(executor.submit(_get_me_response,
consumer=consumer,
consumer_id=consumer_id
)
)
现在,一旦我提交了我的线程,我就可以成功地收到每个线程上的消息。
我的问题在于收集期货的结果并使用该结果响应 REST 端点
大多数在线示例展示了如何通过“打印”语句获取结果形式的期货。现在,这是我从期货中获取响应的代码:
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
break
return response
我想要做的是,如果我收到关于未来任何一个的响应,我想转到 return 语句而不是等待其他期货完成(他们永远不会,因为他们有一个消费者的 for 循环永无止境)。打印语句在控制台中打印成功,但我无法退出循环
我尝试过使用我自己的全局变量
- 默认设置为 False
- 用于在消费者函数内部运行 while 循环
- 当我收到来自 future 的响应时设置为 True
但这似乎没有按预期工作。
全局变量使用代码:
RETURN_CONSUMER_FLAG = False <---- Defining global variable
# function that listens to consumer messages in each thread with global var check
def _get_me_response(consumer_id, consumer):
while not RETURN_CONSUMER_FLAG: <---- Looping on global variable
for message in consumer:
message = message.value
consumer.commit()
return consumer_id, message
...
...
# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
resp_cid, response = future.result()
print(json.dumps(response))
if response['match_status'] == 1:
global RETURN_CONSUMER_FLAG <---- Setting global variable to TRUE
RETURN_CONSUMER_FLAG = True
return response
我检查了 as_completed 的代码,似乎这可能是因为它是一个产量生成器而不是一个返回函数,所以它一直等待所有期货完成工作,然后才能退出循环。
知道如何解决这个问题吗?
【问题讨论】:
-
return response
没有正确缩进,你能确定这不是错误的原因吗? -
只是为了确保:您想等待第一个未来完成,然后提前退出函数而不等待其他未来完成?
-
@LouisLac 缩进问题不在实际代码中。它只是在这里复制有错误。我纠正了它。在你的第二个问题上,是的 - 我想等到任何一个期货完成(不一定是第一个)并在任何其他完成之前提前退出
-
感谢您的详细介绍,如果您只想等待第一个未来完成,您可以使用
concurrent.futures.wait(futures, timeout=None, return_when=FIRST_COMPLETED)
而不是as_completed(...)
,您可以试试这个吗? -
@LouisLac 嘿,我确实尝试了你的建议,但它并没有达到我需要的方式。一些要求也发生了变化,这不再是我代码的简单路径,所以我决定使用消费者超时。我感谢您的帮助!谢谢!
标签:
python
python-3.x
multithreading
apache-kafka
concurrent.futures
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:从在 python ThreadPoolExecuter 中运行 kafka 消费者的 concurrent.futures 获取结果 - Python技术站