创建一个进程
import multiprocessing
def worker():
print("Process is running")
p = multiprocessing.Process(target=worker)
p.start()
p.join()创建一个后台运行的进程
import multiprocessing
import time
def foo():
name = multiprocessing.current_process().name
print("Starting %s " % name)
time.sleep(3)
print("Exiting %s " % name)
if __name__ == '__main__':
background_process = multiprocessing.Process(name='background_process', target=foo)
background_process.daemon = True
NO_background_process = multiprocessing.Process(name='NO_background_process', target=foo)
NO_background_process.daemon = False
background_process.start()
NO_background_process.start()注意,后台进程不允许创建子进程。否则,当后台进程跟随父进程退出的时候,子进程会变成孤儿进程。
另外,它们并不是 Unix 的守护进程或服务(daemons or services),所以当非后台进程退出,它们会被终结。
进程属性及生命周期
if __name__ == '__main__':
p = multiprocessing.Process(target=foo)
print('Process before execution:', p, p.is_alive())
p.start()
print('Process running:', p, p.is_alive())
p.terminate()
print('Process terminated:', p, p.is_alive())
p.join()
print('Process joined:', p, p.is_alive())
print('Process exit code:', p.exitcode)用 is_alive() 方法监控它的生命周期,调用 terminate() 方法结束进程,调用 exitcode 属性来获得退出状态码。
exitcode 可能的值如下:
- == 0: 正常退出
-
0: 进程有错误,并以此状态码退出
- < 0: 进程被
-1 *的信号杀死并以此作为 ExitCode 退出
- 进程 ID:每个进程都有一个唯一的 ID,我们可以通过
os.getpid()来获取当前进程的 ID。 - 父进程 ID:我们可以通过
os.getppid()来获取当前进程的父进程 ID。 - 进程状态:进程的状态包括运行中、就绪、阻塞等。我们可以通过一些系统命令如
ps来查看进程的状态。 - 进程的创建时间:我们可以通过
psutil.Process().create_time()来获取进程的创建时间。 - 进程使用的 CPU 时间:我们可以通过
os.times()来获取进程使用的 CPU 时间。 - 进程使用的内存:我们可以通过
psutil.Process().memory_info()来获取进程使用的内存。 - 进程的当前工作目录:我们可以通过
os.getcwd()来获取进程的当前工作目录。 - 进程的环境变量:我们可以通过
os.environ来获取进程的环境变量。
自定义 Python 进程
实现一个自定义的进程子类,需要以下三步:
- 定义
Process的子类 - 覆盖
__init__(self [,args])方法来添加额外的参数 - 覆盖
run(self, [.args])方法来实现Process启动的时候执行的任务
创建 Porcess 子类之后,你可以创建它的实例并通过 start() 方法启动它,启动之后会运行 run() 方法。
import multiprocessing
class MyProcess(multiprocessing.Process):
def run(self):
print ('called run method in process: %s' % self.name)
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = MyProcess()
jobs.append(p)
p.start()
p.join()进程之间的通信
Multiprocessing 库有两个 Communication Channel 可以交换对象:队列(queue)和管道(pipe)。
使用队列进行通信
class Producer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
for i in range(10):
item = random.randint(0, 256)
self.queue.put(item)
...
class Consumer(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def run(self):
while True:
if self.queue.empty():
...
if __name__ == '__main__':
queue = multiprocessing.Queue()
process_producer = Producer(queue)
process_consumer = Consumer(queue)
process_producer.start()
process_consumer.start()
process_producer.join()
process_consumer.join()队列还有一个 JoinableQueue 子类,它有以下两个额外的方法:
task_done(): 此方法意味着之前入队的一个任务已经完成,比如,get()方法从队列取回 item 之后调用。所以此方法只能被队列的消费者调用。join(): 此方法将进程阻塞,直到队列中的 item 全部被取出并执行。
使用管道进行通信
def create_items(pipe):
output_pipe, _ = pipe
for item in range(10):
output_pipe.send(item)
output_pipe.close()
def multiply_items(pipe_1, pipe_2):
close, input_pipe = pipe_1
close.close()
output_pipe, _ = pipe_2
try:
while True:
item = input_pipe.recv()
output_pipe.send(item * item)
except EOFError:
output_pipe.close()
if __name__== '__main__':
pipe_1 = multiprocessing.Pipe(True)
process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
process_pipe_1.start()
pipe_2 = multiprocessing.Pipe(True)
process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
process_pipe_2.start()
pipe_1[0].close()
pipe_2[0].close()
try:
while True:
print(pipe_2[1].recv())
except EOFError:
print("End")进程之间的同步
进程的同步原语如下:
- Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个 Lock 对象有两个方法,
acquire()和release(),来控制共享数据的读写权限。 - Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。
Event对象有两个方法,set()和clear(),来管理自己内部的变量。 - Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法:
wait()用来等待进程,notify_all()用来通知所有等待此条件的进程。 - Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
- Rlock: 递归锁对象。其用途和方法同
Threading模块一样。 - Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。
具体使用略,8. 进程如何同步 — python-parallel-programming-cookbook-cn 1.0 文档
如何在进程之间管理状态
Python 的多进程模块提供了在所有的用户间管理共享信息的管理者(Manager)。一个管理者对象控制着持有 Python 对象的服务进程,并允许其它进程操作共享对象。
管理者有以下特性:
- 它控制着管理共享对象的服务进程
- 它确保当某一进程修改了共享对象之后,所有的进程拿到额共享对象都得到了更新
9. 如何在进程之间管理状态 — python-parallel-programming-cookbook-cn 1.0 文档
如何使用进程池
多进程库提供了 Pool 类来实现简单的多进程任务。 Pool 类有以下方法:
apply(): 直到得到结果之前一直阻塞。apply_async(): 这是apply()方法的一个变体,返回的是一个 result 对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。map(): 这是内置的map()函数的并行版本。在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。map_async(): 这是map()方法的一个变体,返回一个 result 对象。如果指定了回调函数,回调函数应该是 callable 的,并且只接受一个参数。当 result 准备好时会自动调用回调函数(除非调用失败)。回调函数应该立即完成,否则,持有 result 的进程将被阻塞。
import multiprocessing
def function_square(data):
result = data*data
return result
if __name__ == '__main__':
inputs = list(range(100))
pool = multiprocessing.Pool(processes=4)
pool_outputs = pool.map(function_square, inputs)
pool.close()
pool.join()
print ('Pool :', pool_outputs)使用 Python 的 subprocess 模块
在 Python 中,subprocess 模块是用于产生子进程,并能够连接到子进程的输入/输出/错误管道,获取返回值等的一个库。
subprocess 模块和 multiprocessing 模块都是 Python 中用于创建和管理进程的库,但它们有一些主要的区别:
subprocess模块允许你运行和控制其他程序。multiprocessing模块则是让你可以在多个进程中执行 Python 代码,来实现并行处理。
这个模块提供了一些函数,如 run() 和 Popen():
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, ...): 这个函数是用来运行命令的。它接收一个包含命令和参数的列表或者字符串作为参数,然后返回一个CompletedProcess对象。如果check=True,并且这个命令返回非零的退出代码,那么它会抛出一个CalledProcessError异常。subprocess.Popen(args, ...): 这个函数可以用来启动一个新的进程,并且可以与这个进程交互。它返回一个Popen对象,这个对象有一些方法,如wait(),communicate(),poll()等,可以用来控制和交互这个进程。
如果使用 subprocess.Popen 启动一个进程,然后立即返回,不等待这个进程结束,那么这个进程就相当于在后台运行。例如,下面的代码会在后台启动一个新的 Python 解释器进程:
import subprocess
subprocess.Popen(["python", "-c", "print('Hello, World!')"])而如果使用 subprocess.run() 或者 Popen 的 wait() 或 communicate() 方法,那么 Python 程序会等待子进程结束,这时子进程就不是在后台运行了。例如,下面的代码会等待新的 Python 解释器进程结束:
import subprocess
subprocess.run(["python", "-c", "print('Hello, World!')"])更多详细的信息和高级用法,可以参考 Python 官方文档:subprocess — Subprocess management — Python 3.12.4 documentation
使用 Python 的 mpi4py 模块
11. 使用 Python 的 mpi4py 模块 — python-parallel-programming-cookbook-cn 1.0 文档
12. 点对点通讯 — python-parallel-programming-cookbook-cn 1.0 文档
13. 避免死锁问题 — python-parallel-programming-cookbook-cn 1.0 文档
14. 集体通讯:使用 broadcast 通讯 — python-parallel-programming-cookbook-cn 1.0 文档
15. 集体通讯:使用 scatter 通讯 — python-parallel-programming-cookbook-cn 1.0 文档
16. 集体通讯:使用 gather 通讯 — python-parallel-programming-cookbook-cn 1.0 文档
17. 使用 Alltoall 通讯 — python-parallel-programming-cookbook-cn 1.0 文档
18. 简化操作 — python-parallel-programming-cookbook-cn 1.0 文档
19. 如何优化通讯 — python-parallel-programming-cookbook-cn 1.0 文档
Backlinks:
LIST FROM [[]]