创建一个进程

import multiprocessing
def worker():
    print("Process is running")
p = multiprocessing.Process(target=worker)
p.start()
p.join()

创建一个后台运行的进程

import multiprocessing
import time
 
def foo():
    name = multiprocessing.current_process().name
    print("Starting %s " % name)
    time.sleep(3)
    print("Exiting %s " % name)
 
if __name__ == '__main__':
    background_process = multiprocessing.Process(name='background_process', target=foo)
    background_process.daemon = True
    NO_background_process = multiprocessing.Process(name='NO_background_process', target=foo)
    NO_background_process.daemon = False
    background_process.start()
    NO_background_process.start()

注意,后台进程不允许创建子进程。否则,当后台进程跟随父进程退出的时候,子进程会变成孤儿进程。

另外,它们并不是 Unix 的守护进程或服务(daemons or services),所以当非后台进程退出,它们会被终结。

进程属性及生命周期

if __name__ == '__main__':
	p = multiprocessing.Process(target=foo)
	print('Process before execution:', p, p.is_alive())
	p.start()
	print('Process running:', p, p.is_alive())
	p.terminate()
	print('Process terminated:', p, p.is_alive())
	p.join()
	print('Process joined:', p, p.is_alive())
	print('Process exit code:', p.exitcode)

用  is_alive()  方法监控它的生命周期,调用  terminate()  方法结束进程,调用 exitcode 属性来获得退出状态码。

exitcode  可能的值如下:

  • == 0: 正常退出
  • 0: 进程有错误,并以此状态码退出

  • < 0: 进程被  -1 *  的信号杀死并以此作为 ExitCode 退出
  1. 进程 ID:每个进程都有一个唯一的 ID,我们可以通过  os.getpid()  来获取当前进程的 ID。
  2. 父进程 ID:我们可以通过  os.getppid()  来获取当前进程的父进程 ID。
  3. 进程状态:进程的状态包括运行中、就绪、阻塞等。我们可以通过一些系统命令如  ps  来查看进程的状态。
  4. 进程的创建时间:我们可以通过  psutil.Process().create_time()  来获取进程的创建时间。
  5. 进程使用的 CPU 时间:我们可以通过  os.times()  来获取进程使用的 CPU 时间。
  6. 进程使用的内存:我们可以通过  psutil.Process().memory_info()  来获取进程使用的内存。
  7. 进程的当前工作目录:我们可以通过  os.getcwd()  来获取进程的当前工作目录。
  8. 进程的环境变量:我们可以通过  os.environ  来获取进程的环境变量。

自定义 Python 进程

实现一个自定义的进程子类,需要以下三步:

  • 定义  Process  的子类
  • 覆盖  __init__(self [,args])  方法来添加额外的参数
  • 覆盖  run(self, [.args])  方法来实现  Process  启动的时候执行的任务

创建  Porcess  子类之后,你可以创建它的实例并通过  start()  方法启动它,启动之后会运行  run()  方法。

import multiprocessing
class MyProcess(multiprocessing.Process):
	def run(self):
		print ('called run method in process: %s' % self.name)
		return
 
if __name__ == '__main__':
	jobs = []
	for i in range(5):
		p = MyProcess()
		jobs.append(p)
		p.start()
		p.join()

进程之间的通信

Multiprocessing 库有两个 Communication Channel 可以交换对象:队列(queue)和管道(pipe)。

使用队列进行通信

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
 
    def run(self):
        for i in range(10):
            item = random.randint(0, 256)
            self.queue.put(item)
            ...
 
class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
 
    def run(self):
        while True:
            if self.queue.empty():
            	...
 
if __name__ == '__main__':
    queue = multiprocessing.Queue()
    process_producer = Producer(queue)
    process_consumer = Consumer(queue)
    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()

队列还有一个  JoinableQueue  子类,它有以下两个额外的方法:

  • task_done(): 此方法意味着之前入队的一个任务已经完成,比如, get()  方法从队列取回 item 之后调用。所以此方法只能被队列的消费者调用。
  • join(): 此方法将进程阻塞,直到队列中的 item 全部被取出并执行。

使用管道进行通信

def create_items(pipe):
    output_pipe, _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()
 
def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item * item)
    except EOFError:
        output_pipe.close()
 
if __name__== '__main__':
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()
    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2,))
    process_pipe_2.start()
    pipe_1[0].close()
    pipe_2[0].close()
    try:
        while True:
            print(pipe_2[1].recv())
    except EOFError:
        print("End")

进程之间的同步

进程的同步原语如下:

  • Lock: 这个对象可以有两种装填:锁住的(locked)和没锁住的(unlocked)。一个 Lock 对象有两个方法, acquire()  和  release() ,来控制共享数据的读写权限。
  • Event: 实现了进程间的简单通讯,一个进程发事件的信号,另一个进程等待事件的信号。 Event  对象有两个方法, set()  和  clear() ,来管理自己内部的变量。
  • Condition: 此对象用来同步部分工作流程,在并行的进程中,有两个基本的方法: wait()  用来等待进程, notify_all()  用来通知所有等待此条件的进程。
  • Semaphore: 用来共享资源,例如,支持固定数量的共享连接。
  • Rlock: 递归锁对象。其用途和方法同  Threading  模块一样。
  • Barrier: 将程序分成几个阶段,适用于有些进程必须在某些特定进程之后执行。处于障碍(Barrier)之后的代码不能同处于障碍之前的代码并行。

具体使用略,8. 进程如何同步 — python-parallel-programming-cookbook-cn 1.0 文档

如何在进程之间管理状态

Python 的多进程模块提供了在所有的用户间管理共享信息的管理者(Manager)。一个管理者对象控制着持有 Python 对象的服务进程,并允许其它进程操作共享对象。

管理者有以下特性:

  • 它控制着管理共享对象的服务进程
  • 它确保当某一进程修改了共享对象之后,所有的进程拿到额共享对象都得到了更新

9. 如何在进程之间管理状态 — python-parallel-programming-cookbook-cn 1.0 文档

如何使用进程池

多进程库提供了  Pool  类来实现简单的多进程任务。 Pool  类有以下方法:

  • apply(): 直到得到结果之前一直阻塞。
  • apply_async(): 这是  apply()  方法的一个变体,返回的是一个 result 对象。这是一个异步的操作,在所有的子类执行之前不会锁住主进程。
  • map(): 这是内置的  map()  函数的并行版本。在得到结果之前一直阻塞,此方法将可迭代的数据的每一个元素作为进程池的一个任务来执行。
  • map_async(): 这是  map()  方法的一个变体,返回一个 result 对象。如果指定了回调函数,回调函数应该是 callable 的,并且只接受一个参数。当 result 准备好时会自动调用回调函数(除非调用失败)。回调函数应该立即完成,否则,持有 result 的进程将被阻塞。
import multiprocessing
 
def function_square(data):
    result = data*data
    return result
 
if __name__ == '__main__':
    inputs = list(range(100))
    pool = multiprocessing.Pool(processes=4)
    pool_outputs = pool.map(function_square, inputs)
    pool.close()
    pool.join()
    print ('Pool    :', pool_outputs)

使用 Python 的 subprocess 模块

在 Python 中,subprocess 模块是用于产生子进程,并能够连接到子进程的输入/输出/错误管道,获取返回值等的一个库。

subprocess 模块和 multiprocessing 模块都是 Python 中用于创建和管理进程的库,但它们有一些主要的区别:

  • subprocess 模块允许你运行和控制其他程序。
  • multiprocessing 模块则是让你可以在多个进程中执行 Python 代码,来实现并行处理。

这个模块提供了一些函数,如 run()Popen()

  • subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None, capture_output=False, shell=False, cwd=None, timeout=None, check=False, ...): 这个函数是用来运行命令的。它接收一个包含命令和参数的列表或者字符串作为参数,然后返回一个 CompletedProcess 对象。如果 check=True,并且这个命令返回非零的退出代码,那么它会抛出一个 CalledProcessError 异常。
  • subprocess.Popen(args, ...): 这个函数可以用来启动一个新的进程,并且可以与这个进程交互。它返回一个 Popen 对象,这个对象有一些方法,如 wait(), communicate(), poll() 等,可以用来控制和交互这个进程。

如果使用 subprocess.Popen 启动一个进程,然后立即返回,不等待这个进程结束,那么这个进程就相当于在后台运行。例如,下面的代码会在后台启动一个新的 Python 解释器进程:

import subprocess
 
subprocess.Popen(["python", "-c", "print('Hello, World!')"])

而如果使用 subprocess.run() 或者 Popenwait()communicate() 方法,那么 Python 程序会等待子进程结束,这时子进程就不是在后台运行了。例如,下面的代码会等待新的 Python 解释器进程结束:

import subprocess
 
subprocess.run(["python", "-c", "print('Hello, World!')"])

更多详细的信息和高级用法,可以参考 Python 官方文档:subprocess — Subprocess management — Python 3.12.4 documentation

使用 Python 的 mpi4py 模块

11. 使用 Python 的 mpi4py 模块 — python-parallel-programming-cookbook-cn 1.0 文档

12. 点对点通讯 — python-parallel-programming-cookbook-cn 1.0 文档

13. 避免死锁问题 — python-parallel-programming-cookbook-cn 1.0 文档

14. 集体通讯:使用 broadcast 通讯 — python-parallel-programming-cookbook-cn 1.0 文档

15. 集体通讯:使用 scatter 通讯 — python-parallel-programming-cookbook-cn 1.0 文档

16. 集体通讯:使用 gather 通讯 — python-parallel-programming-cookbook-cn 1.0 文档

17. 使用 Alltoall 通讯 — python-parallel-programming-cookbook-cn 1.0 文档

18. 简化操作 — python-parallel-programming-cookbook-cn 1.0 文档

19. 如何优化通讯 — python-parallel-programming-cookbook-cn 1.0 文档

Backlinks:

LIST FROM [[]]