Python进程间通信Queue消息队列用法分析
本文主要讲解Python中进程间通信的一种方式——消息队列(Queue)的用法。通过配置Queue,不同的Python进程之间可以进行信息的传递和共享,达到进程间通信的目的。
什么是Queue
Queue是Python内置的一个类,它顾名思义是队列,具有FIFO(先进先出)的特性。主要包含以下方法:
put(item[, block[, timeout]])
: 将item放入队列中,如果队列满了,block参数为True则等待,为False则立刻抛出Full异常;timeout参数为等待时间。get([block[, timeout]])
: 从队列中获取一个item,队列为空时,block参数为True则等待,为False则立刻抛出Empty异常;timeout参数为等待时间。qsize()
: 返回队列中当前的item数目。empty()
: 如果队列为空返回True,否则返回False。full()
: 如果队列已满返回True,否则返回False。task_done()
: 从队列中取出一个item,应该在处理完这个item后调用此方法,表示已处理完成。join()
: 阻塞直到队列中所有item都被取出。
Queue的用法
使用Queue实现进程间通信包含以下步骤:
- 创建队列
- 创建进程,并将队列作为参数传递给进程
- 在进程中读写队列
- 退出进程
创建队列
首先创建一个队列,用于进程间共享信息。在Python中创建Queue可以使用multiprocessing.Queue方法。下面的示例代码创建了一个大小为3的队列:
import multiprocessing
queue = multiprocessing.Queue(3)
创建进程
接下来创建两个进程,一个进程用于写入数据到队列,一个进程用于读取队列中的数据。在创建进程时,需要将队列对象作为参数传递给进程。下面的示例代码展示了如何创建进程:
import multiprocessing
import time
def writer(q):
for i in range(5):
if q.full():
print('队列已满')
break
q.put(i)
print(f'添加{i}到队列中')
time.sleep(1)
def reader(q):
while True:
if q.empty():
print('队列已空')
break
data = q.get()
print(f'读取到{data}')
time.sleep(1)
queue = multiprocessing.Queue(3)
p1 = multiprocessing.Process(target=writer, args=(queue,))
p2 = multiprocessing.Process(target=reader, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
上述代码中,我们创建了两个进程,writer和reader。writer进程向队列写入数字0-4,reader进程从队列中读取数据。在writer进程中,我们使用了put方法向队列中添加数据,如果队列已满则等待。在reader进程中,我们使用了get方法从队列中读取数据,如果队列为空则等待。两个进程都通过time.sleep(1)方法模拟处理的过程。
读写队列
上述示例代码中,在进程中我们使用了put和get方法向队列写入和读取数据。值得注意的是,在使用完队列中的一个item后,我们需要调用task_done方法,来表示已经处理完成。此外,当队列为空时,我们需要通过empty方法来判断,当队列已满时,我们需要通过full方法来判断。下面的示例代码展示了如何读写队列:
import multiprocessing
import time
def writer(q):
for i in range(5):
if q.full():
print('队列已满')
break
q.put(i)
print(f'添加{i}到队列中')
time.sleep(1)
def reader(q):
while True:
if q.empty():
print('队列已空')
break
data = q.get()
print(f'读取到{data}')
q.task_done()
time.sleep(1)
queue = multiprocessing.Queue(3)
p1 = multiprocessing.Process(target=writer, args=(queue,))
p2 = multiprocessing.Process(target=reader, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
queue.join()
退出进程
在Python中清理进程的方法是调用terminate方法,这个方法将立即终止进程,但是会留下一些资源没有清理干净。而且,进程的终止往往是非常突然的,如果在退出进程前需要处理一些工作,可能会出现问题。因此,一般不建议使用terminate方法在Python中终止进程。而是建议应该在进程中使用一个exit标记来控制进程是否退出。具体的做法是:
- 在进程代码中添加一个exit标记,初始为False,当需要退出进程时将exit标记设置为True。
- 可以在进程中使用while循环,判断是否需要退出进程,如果需要退出,则break循环,退出进程。
- 在主进程中使用进程对象的join方法,等待子进程退出,然后清理资源。
示例
下面我们通过两个示例说明消息队列的用法。
示例1
在这个示例中,我们创建3个进程,分别用于读取键盘输入、字符过滤,和打印符合条件的字符。其中,字符过滤进程可以被中断,通过在键盘输入exit
来退出进程。
import multiprocessing
def input_worker(queue):
while True:
data = input('请输入要过滤的字符:')
if data == 'exit':
queue.put(data)
break
queue.put(data)
def filter_worker(input_queue, output_queue):
while True:
data = input_queue.get()
if data == 'exit':
output_queue.put(data)
break
if len(data) >= 5:
output_queue.put(data)
def print_worker(queue):
while True:
data = queue.get()
if data == 'exit':
break
print(f'过滤后字符为 {data}')
input_queue = multiprocessing.Queue()
filter_queue = multiprocessing.Queue()
print_queue = multiprocessing.Queue()
input_process = multiprocessing.Process(target=input_worker, args=(input_queue,))
filter_process = multiprocessing.Process(target=filter_worker, args=(input_queue, filter_queue,))
print_process = multiprocessing.Process(target=print_worker, args=(filter_queue,))
input_process.start()
filter_process.start()
print_process.start()
input_process.join()
filter_process.join()
print_process.join()
print('所有进程已退出')
在这个示例中,我们创建了3个进程:input_worker、filter_worker、print_worker。input_worker进程负责从键盘读取输入,filter_worker进程负责对输入内容进行过滤,print_worker进程负责打印符合条件的字符。在示例中我们使用了三个Queue,其中input_queue是input_worker进程和filter_worker进程之间的通道,filter_queue是filter_worker进程和print_worker进程之间的通道,print_queue用于输出结果。
示例2
在这个示例中,我们创建10个进程,用于计算斐波那契数列中每个数字的阶乘。
import multiprocessing
from math import factorial
def calc_factorial(q, num):
result = factorial(num)
q.put(result)
if __name__ == '__main__':
fibonacci_sequence = [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
process_list = []
queue_list = []
for i in range(10):
q = multiprocessing.Queue()
queue_list.append(q)
for i in range(10):
p = multiprocessing.Process(target=calc_factorial, args=(queue_list[i], fibonacci_sequence[i],))
process_list.append(p)
p.start()
for p in process_list:
p.join()
for q in queue_list:
print(q.get())
在这个示例中,我们创建了10个进程,分别计算斐波那契数列中每个数字的阶乘。为了简化问题,我们直接使用了斐波那契数列,而不是像实际应用那样通过计算斐波那契数列来得到这些数字。
首先,我们创建了10个Queue,用于进程间传递数据。然后,我们创建了10个进程,每个进程负责计算对应位置斐波那契数列的数字的阶乘,并把结果放入对应的Queue中。最后,我们通过join方法等待所有进程执行完成,然后依次从队列中读取结果并打印。通过这种方式,我们可以同时计算多个数字的阶乘,提高了计算效率。
总结
本文主要讲解了Python中进程间通信的一种方式——消息队列(Queue)的用法。Queue提供了一种轻量级的进程间通信方式,不需要使用共享内存等高级技术,而且使用Queue非常方便。在日常开发中,我们可以使用Queue来进行并发编程,提高代码的执行效率。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:Python进程间通信Queue消息队列用法分析 - Python技术站