Python 包之 multiprocessing 多进程
multiprocessing
是 Python 标准库中提供的模块,可以方便地使用多进程进行并发编程。它提供了与 Python 标准库 threading
模块相同的接口,但是使用多进程编程可以充分利用多核 CPU 的优势,用于加速 CPU 密集型任务。
multiprocessing 模块的主要组件
-
Process
:进程对象,用于创建新进程。 -
Queue
:进程间通信(IPC)的队列对象,用于在多个进程之间安全地共享数据。 -
Pool
:进程池对象,用于管理池中的多个 Worker 进程,执行一组对数据集的并行操作。 -
Lock
、RLock
、Semaphore
:进程锁,用于控制多个进程对共享资源的访问。
创建新进程
multiprocessing.Process
可以用于创建新的进程,常用的方式有如下两种:
方式一:函数形式
import multiprocessing
def work(name):
print(f"Working on {name}")
if __name__ == '__main__':
p = multiprocessing.Process(target=work, args=('Alice',))
p.start()
p.join()
在上面的代码中,我们通过 multiprocessing.Process()
函数创建了一个新进程,并将 work()
函数作为任务传递给了该进程。start()
方法用于启动该进程,并且 join()
方法用于等待该进程完成。在运行该代码时,可以看到类似于如下输出:
Working on Alice
方式二:面向对象式
import multiprocessing
class Worker(multiprocessing.Process):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
print(f"Working on {self.name}")
if __name__ == '__main__':
p = Worker('Bob')
p.start()
p.join()
在上面的代码中,我们通过继承 multiprocessing.Process
类创建了一个新的进程对象,并覆盖了 run()
方法,该方法内部执行我们的工作。在运行该代码时,可以看到类似于如下输出:
Working on Bob
进程间通信
多个进程之间需要共享数据或结果时,可以使用 multiprocessing.Queue
类来实现进程间通信(IPC)。以下是一个示例,演示了两个进程之间如何实现数据的共享。
import multiprocessing
def producer(queue):
for i in range(5):
print(f'Producing {i}')
queue.put(i)
def consumer(queue):
while True:
data = queue.get()
if data is None:
break
print(f'Consuming {data}')
if __name__ == '__main__':
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
queue.put(None)
p2.join()
在上面的代码中,我们创建了一个 multiprocessing.Queue
对象,并将其传递给了两个进程函数进行共享。producer
进程函数用于生成数据并将其放入队列中,consumer
进程函数负责不断地取出队列中的数据并进行消费。
在执行该代码时,可以看到类似于如下输出:
Producing 0
Consuming 0
Producing 1
Consuming 1
Producing 2
Consuming 2
Producing 3
Consuming 3
Producing 4
Consuming 4
进程池
multiprocessing.Pool
类可以用于创建一个进程池,执行一组对数据集的并行操作。以下是一个示例,演示了如何使用 Pool
实现对列表中数字的并行求平方。
import multiprocessing
def square(x):
return x*x
if __name__ == '__main__':
pool = multiprocessing.Pool()
result = pool.map(square, [1, 2, 3, 4, 5])
print(result)
在上面的代码中,我们创建了一个进程池,并使用 map()
方法实现对列表中数字的并行求平方。在运行该代码时,可以看到如下输出:
[1, 4, 9, 16, 25]
另外一个示例,演示了如何使用 Pool
实现对多个文件的并行读取和处理。
import multiprocessing
import os
def count_lines(filename):
with open(filename, 'r') as f:
lines = f.readlines()
return len(lines)
if __name__ == '__main__':
pool = multiprocessing.Pool()
folder_path = './files'
filenames = [os.path.join(folder_path, f) for f in os.listdir(folder_path)]
results = pool.map(count_lines, filenames)
total_lines = sum(results)
print(f'Total number of lines in {len(filenames)} files: {total_lines}')
在上面的代码中,我们创建了一个进程池,并使用 map()
方法实现对多个文件的并行读取和处理。其中,count_lines()
函数用于读取文件并返回其行数。在运行该代码时,需要先准备好指定路径下的多个文件,然后可以看到如下输出:
Total number of lines in 3 files: 15
进程锁
在多个进程共享同一份数据时,可能会出现多个进程同时读写该数据的情况,如果没有加锁保护,可能会导致数据的错误和不可预期的结果。可以使用 multiprocessing.Lock
、multiprocessing.RLock
和 multiprocessing.Semaphore
等类来实现进程锁的功能。
以下是一个示例,演示了如何使用 Lock
类来保证多个进程安全地访问同一份数据。
import multiprocessing
def deposit(balance, lock):
for i in range(10000):
lock.acquire()
balance.value += 1
lock.release()
def withdraw(balance, lock):
for i in range(10000):
lock.acquire()
balance.value -= 1
lock.release()
if __name__ == '__main__':
balance = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
d = multiprocessing.Process(target=deposit, args=(balance, lock))
w = multiprocessing.Process(target=withdraw, args=(balance, lock))
d.start()
w.start()
d.join()
w.join()
print(balance.value)
在上面的代码中,我们创建了一个共享变量 balance
,并分别创建了存款和取款两个进程,它们会在 10000 次循环中反复对 balance
进行加减操作。为了避免多个进程同时访问同一份数据,我们使用 multiprocessing.Lock
类来对访问 balance
的进程进行加锁保护。在运行该代码时,可以看到如下输出:
0
可以看到,经过了 20000 次加减操作,最终 balance
的值保持不变,证明了我们的加锁保护起了作用。
总结
在本篇文章中,我们介绍了 Python 标准库 multiprocessing
模块的主要组件,包括创建新进程、进程间通信、进程池、进程锁等。同时,我们也给出了多个示例,演示了如何使用 multiprocessing
进行并发编程的实际应用。
本站文章如无特殊说明,均为本站原创,如若转载,请注明出处:python 包之 multiprocessing 多进程 - Python技术站