【问题标题】:get result from concurrent.futures which runs a kafka consumer in a python ThreadPoolExecuter从在 python ThreadPoolExecuter 中运行 kafka 消费者的 concurrent.futures 获取结果
【发布时间】:2023-04-05 13:46:01
【问题描述】:

我正在根据主题可能具有的分区数量动态创建 kafka 消费者。 (使用KafkaConsumer形式的Kafka-Python)

创建消费者后,我使用 ThreadPoolExecuter 启动线程,开始监听这些消费者分区上的特定主题

注意:整个代码都属于一个烧瓶 API 端点。目标是监听 REST 调用,向生产者发送消息,然后监听消费者的响应,然后将响应返回给 REST 调用

# function that listens to consumer messages in each thread
def _get_me_response(consumer_id, consumer):
    for message in consumer:
        message = message.value
        break
    consumer.commit()
    return consumer_id, message

# ThreadPoolExecuter to start threads
  with ThreadPoolExecutor(max_workers=len(consumers)) as executor:
      futures = []
      for consumer_id, consumer in consumers.items():
          futures.append(executor.submit(_get_me_response,
                                         consumer=consumer,
                                         consumer_id=consumer_id
                                         )
                         )

现在,一旦我提交了我的线程,我就可以成功地收到每个线程上的消息。

我的问题在于收集期货的结果并使用该结果响应 REST 端点

大多数在线示例展示了如何通过“打印”语句获取结果形式的期货。现在,这是我从期货中获取响应的代码:

 # code for gathering result from future in whichever order they arrive
 for future in as_completed(futures):
     resp_cid, response = future.result()
     print(json.dumps(response))
     if response['match_status'] == 1:
         break

 return response

我想要做的是,如果我收到关于未来任何一个的响应,我想转到 return 语句而不是等待其他期货完成(他们永远不会,因为他们有一个消费者的 for 循环永无止境)。打印语句在控制台中打印成功,但我无法退出循环


我尝试过使用我自己的全局变量

  1. 默认设置为 False
  2. 用于在消费者函数内部运行 while 循环
  3. 当我收到来自 future 的响应时设置为 True

但这似乎没有按预期工作。

全局变量使用代码:

RETURN_CONSUMER_FLAG = False                          <---- Defining global variable

# function that listens to consumer messages in each thread with global var check
def _get_me_response(consumer_id, consumer):
    while not RETURN_CONSUMER_FLAG:                   <---- Looping on global variable
        for message in consumer:
            message = message.value
            consumer.commit()
            return consumer_id, message
...
...

# code for gathering result from future in whichever order they arrive
for future in as_completed(futures):
    resp_cid, response = future.result()
    print(json.dumps(response))
    if response['match_status'] == 1:
        global RETURN_CONSUMER_FLAG                   <---- Setting global variable to TRUE
        RETURN_CONSUMER_FLAG = True

return response

我检查了 as_completed 的代码,似乎这可能是因为它是一个产量生成器而不是一个返回函数,所以它一直等待所有期货完成工作,然后才能退出循环。

知道如何解决这个问题吗?

【问题讨论】:

  • return response 没有正确缩进,你能确定这不是错误的原因吗?
  • 只是为了确保:您想等待第一个未来完成,然后提前退出函数而不等待其他未来完成?
  • @LouisLac 缩进问题不在实际代码中。它只是在这里复制有错误。我纠正了它。在你的第二个问题上,是的 - 我想等到任何一个期货完成(不一定是第一个)并在任何其他完成之前提前退出
  • 感谢您的详细介绍,如果您只想等待第一个未来完成,您可以使用concurrent.futures.wait(futures, timeout=None, return_when=FIRST_COMPLETED) 而不是as_completed(...),您可以试试这个吗?
  • @LouisLac 嘿,我确实尝试了你的建议,但它并没有达到我需要的方式。一些要求也发生了变化,这不再是我代码的简单路径,所以我决定使用消费者超时。我感谢您的帮助!谢谢!

标签:
python
python-3.x
multithreading
apache-kafka
concurrent.futures