下面我将详细讲解在Python中利用队列 asyncio.Queue
进行通讯的攻略。
什么是 asyncio.Queue
asyncio.Queue
是 Python 3.5 版本开始引入的异步队列,用于在协程之间进行通讯。根据先进先出(FIFO)原则,队列中的每个元素都具有唯一的索引位置,并且可以根据索引位置进行访问。
使用 asyncio.Queue
可以方便地实现协程之间的通讯和数据共享,避免了协程之间的竞争和阻塞等问题。
asyncio.Queue 的基础用法
下面是 asyncio.Queue
的基础用法:
import asyncio
async def producer(queue):
print('生产者开始生产消息')
for i in range(5):
await asyncio.sleep(0.1) # 模拟生产速度
item = f'消息{i}'
await queue.put(item) # 往队列中放入新消息
print(f'生产者生产了消息:{item}')
async def consumer(queue):
print('消费者开始消费消息')
while True:
item = await queue.get() # 从队列中取出消息
print(f'消费者消费了消息:{item}')
async def main():
queue = asyncio.Queue() # 创建队列
task1 = asyncio.create_task(producer(queue))
task2 = asyncio.create_task(consumer(queue))
await task1
await task2
asyncio.run(main()) # 运行主函数
这里定义了两个协程 producer
和 consumer
,分别表示生产者和消费者。在 main
函数中创建一个队列 queue
,并将其传递给生产者和消费者协程。producer
协程不断地生产消息并将其放入队列中,consumer
协程不断地从队列中取出消息并消费它。在最后的 asyncio.run()
函数中执行主函数 main
。
当程序运行时,可以看到生产者不断地生产消息,消费者不断地消费消息,而且整个通讯过程是异步的。
asyncio.Queue 的高级用法
除了基础用法之外,asyncio.Queue
还支持以下高级用法:
消息缓存
往队列中放入消息时,如果队列已经满了,就会阻塞生产者直到有消费者取出消息。可以通过设置队列的 maxsize
属性来控制队列的最大长度。
import asyncio
async def producer(queue):
for i in range(5):
item = f'消息{i}'
await queue.put(item) # 往队列中放入新消息
print(f'生产者生产了消息:{item}')
async def consumer(queue):
while True:
item = await queue.get() # 从队列中取出消息
print(f'消费者消费了消息:{item}')
async def main():
queue = asyncio.Queue(maxsize=2) # 设置最大长度为 2
task1 = asyncio.create_task(producer(queue))
task2 = asyncio.create_task(consumer(queue))
await asyncio.gather(task1, task2)
asyncio.run(main()) # 运行主函数
在这个例子中,将队列的 maxsize
属性设置为 2,当第三条消息 消息2
添加到队列中时,就会阻塞生产者直到有消费者取出该消息。
超时设置
往队列中放入消息或从队列中取出消息时,可以设置超时时间,从而防止程序无限地阻塞等待。
import asyncio
async def producer(queue):
for i in range(5):
item = f'消息{i}'
try:
await queue.put(item, timeout=1) # 超时设置为 1 秒
print(f'生产者生产了消息:{item}')
except asyncio.TimeoutError:
print(f'生产者放置消息超时:{item}')
async def consumer(queue):
while True:
try:
item = await queue.get(timeout=1) # 超时设置为 1 秒
print(f'消费者消费了消息:{item}')
except asyncio.TimeoutError:
print(f'消费者取出消息超时')
async def main():
queue = asyncio.Queue(maxsize=2) # 设置最大长度为 2
task1 = asyncio.create_task(producer(queue))
task2 = asyncio.create_task(consumer(queue))
await asyncio.gather(task1, task2)
asyncio.run(main()) # 运行主函数
在这个例子中,将超时时间设置为 1 秒,当队列已满或已空时,放置消息或取出消息操作就会在 1 秒后超时。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python中利用队列asyncio.Queue进行通讯详解 - Python技术站