使用 subprocess 管理子进程
Python 里面有许多方式都可以运行子进程(例如 os.popen 函数以及 os.exec* 系列的函数),其中最好的办法是通过内置的 subprocess 模块来管理。
import subprocess
result = subprocess.run(
['echo', 'Hello from the child!'],
capture_output=True,
encoding='utf-8')
result.check_returncode()
print(result.stdout)子进程可以独立于父进程而运行,这里的父进程指 Python 解释器所在的那条进程。假如刚才那条子进程不是通过 run 函数启动,而是由 Popen 类启动的,那么我们就可以在它启动之后,让 Python 程序去做别的任务。
例如下面间断性的轮询子进程的状态,判断是否运行完成了。
proc = subprocess.Popen(['sleep', '1'])
while proc.poll() is None:
print('Working...')
import time
time.sleep(0.3)
print('Exit status', proc.poll())还可以将子进程从父进程中剥离出来,让程序并行的运行多条子进程。例如下面的代码运行 10 个子进程,最终只用时 1.05 秒。
import time
start = time.time()
sleep_procs = []
for _ in range(10):
# Use this line instead to make this example work on Windows
# proc = subprocess.Popen(['sleep', '1'], shell=True)
proc = subprocess.Popen(['sleep', '1'])
sleep_procs.append(proc)
for proc in sleep_procs:
proc.communicate()
end = time.time()
delta = end - start
print(f'Finished in {delta:.3} seconds')还可以在 Python 程序里面把数据通过管道发送给子进程所运行的外部命令,然后将那条命令的输出结果获取到 Python 程序之中。例如,要用 oepnssl 这样的命令行工具来加密数据。首先以适当的命令行参数构建一批子进程,并配置好相应的 I/O 管道。
import os
def run_encrypt(data):
env = os.environ.copy()
env['password'] = 'zf7ShyBhZOraQDdE/FiZpm/m/8f9X+M1'
proc = subprocess.Popen(
['openssl', 'enc', '-des3', '-pass', 'env:password'],
env=env,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
proc.stdin.write(data)
proc.stdin.flush() # Ensure that the child gets input
return proc
procs = []
for _ in range(3):
data = os.urandom(10)
proc = run_encrypt(data)
procs.append(proc)
for proc in procs:
out, _ = proc.communicate() # 返回 stdout, stderr
print(out[-10:])这些平行运行的子进程还可以分别与另一套平行的子进程对接,形成许多条平行的管道(pipe)。这种管道与 UNIX 管道类似,能够把一条子进程的输出端同另一条子进程的输入端连接起来。下面,我们写这样一个函数,让它开启一条子进程来运行 openssl 命令,这条命令会根据输入端所发来的数据在输出端生成 Whirlpool 哈希。
def run_hash(input_stdin):
return subprocess.Popen(
['openssl', 'dgst', '-whirlpool', '-binary'],
stdin=input_stdin,
stdout=subprocess.PIPE)
encrypt_procs = []
hash_procs = []
for _ in range(3):
data = os.urandom(100)
encrypt_proc = run_encrypt(data)
encrypt_procs.append(encrypt_proc)
hash_proc = run_hash(encrypt_proc.stdout)
hash_procs.append(hash_proc)
# Ensure that the child consumes the input stream and
# the communicate() method doesn't inadvertently steal
# input from the child. Also lets SIGPIPE propagate to
# the upstream process if the downstream process dies.
encrypt_proc.stdout.close()
encrypt_proc.stdout = None只要上、下游的子进程都启动起来,两者之间的 I/O 管道就会自动打通。我们所要做的仅仅是等待这两批子进程完工并把最终结果打印出来。如果子进程有可能一直不结束,或者由于某种原因卡在输入端或输出端,那么可以在调用 communicate 方法时指定 timeout 参数。
for proc in encrypt_procs:
proc.communicate()
assert proc.returncode == 0
for proc in hash_procs:
out, _ = proc.communicate()
print(out[-10:])
assert proc.returncode == 0
proc = subprocess.Popen(['sleep', '10'])
try:
proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
proc.terminate()
proc.wait()
print('Exit status', proc.poll())用多线程执行 IO 密集型,而非 CPU 密集型
Python 语言的标准实现叫作 CPython,它分两步来运行 Python 程序。首先解析源代码文本,并将其编译成字节码(bytecode)。字节码是一种底层代码,可以把程序表示成 8 位的指令(从 Python 3.6 开始,这种底层代码实际上已经变成 16 位了,所以应该叫作 wordcode 才对,但基本原理依然相同)。然后,CPython 采用基于栈的解释器来运行字节码。这种字节码解释器在执行 Python 程序的过程中,必须确保相关的状态不受干扰,所以 CPython 会用 Python 全局解释器锁 GIL 来保证。
GIL 实际上就是一种互斥锁(mutual-exclusion lock,mutex),用来防止 CPython 的状态在抢占式的多线程环境(preemptive multithreading)之中受到干扰,因为在这种环境下,一条线程有可能突然打断另一条线程抢占程序的控制权。如果这种抢占行为来得不是时候,那么解释器的状态(例如为垃圾回收工作而设立的引用计数等)就会遭到破坏。所以,CPython 要通过 GIL 阻止这样的动作,以确保它自身以及它的那些 C 扩展模块能够正确地执行每一条字节码指令。
但是,GIL 会产生一个很不好的影响。在 C++与 Java 这样的语言里面,如果程序之中有多个线程能够分头执行任务,那么就可以把 CPU 的各个核心充分地利用起来。尽管 Python 也支持多线程,但这些线程受 GIL 约束,所以每次或许只能有一条线程向前推进,而无法实现多头并进。所以,想通过多线程做并行计算或是给程序提速的开发者,恐怕要失望了。
Python 多线程适合处理 IO 密集型任务。例如,用 select 函数让操作系统阻塞 0.1 秒来模拟 IO 阻塞。通过多线程并发执行 5 个 IO 任务,实际上对于主进程来说,只阻塞了 0.1s 多点时间。
尽管 5 条线程依然受 GIL 制约,但它们所发起的系统调用是可以各自向前执行的。GIL 只不过是让 Python 内部的代码无法平行推进而已,至于系统调用,则不会受到影响,因为 Python 线程在即将执行系统调用时,会释放 GIL,待完成调用之后,才会重新获取它。
import select
import socket
def slow_systemcall():
select.select([socket.socket()], [], [], 0.1)
start = time.time()
threads = []
for _ in range(5):
thread = Thread(target=slow_systemcall)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
end = time.time()
delta = end - start除了线程,还有很多办法也能处理阻塞式的 I/O(例如采用内置的 asyncio 模块等)。那些办法都很好,但你可能得花时间去重构代码适应它们所要求的执行模式(参见第 60 条与第 62 条)。与那些办法相比,用多线程处理阻塞式 I/O 是最简单的,而且只需要稍微调整代码就行。
利用 Lock 防止多个线程争用同一份数据
最简单的栗子,多个线程同时对某个值进行自增操作,最终统计的结果比预计的值小了很多。
from threading import Barrier
BARRIER = Barrier(5)
from threading import Thread
class Counter:
def __init__(self):
self.count = 0
def increment(self, offset):
self.count += offset
def worker(sensor_index, how_many, counter):
BARRIER.wait() # BARRIER 保证线程同时运行
for _ in range(how_many):
counter.increment(1)
how_many = 10**5
counter = Counter()
threads = []
for i in range(5):
thread = Thread(target=worker, args=(i, how_many, counter))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
expected = how_many * 5
found = counter.count
# 最终 expected = 500000,但是实际上可能只有一半修改的办法就是使用 Lock 配置互斥锁。
from threading import Lock
class LockingCounter:
def __init__(self):
self.lock = Lock()
self.count = 0
def increment(self, offset):
with self.lock:
self.count += offset用 Queue 来协调各线程之间的工作进度
当队列为空的,get 方法就会阻塞。当队列满了后,put 方法就会阻塞。
给 Queue 限定大小,从而避免下游处理过慢导致的管道拥堵问题。
from queue import Queue
my_queue = Queue(1) # Buffer size of 1
def consumer():
time.sleep(0.1)
my_queue.get() # Runs second
print('Consumer got 1')
my_queue.get() # Runs fourth
print('Consumer got 2')
print('Consumer done')
thread = Thread(target=consumer)
thread.start()
my_queue.put(object()) # Runs first
print('Producer put 1')
my_queue.put(object()) # Runs third
print('Producer put 2')
print('Producer done')
thread.join()join() 方法用于阻塞调用者直到队列中的所有项目都被处理完毕。要使 join() 正确工作,每个取出队列的项目的消费者线程必须在完成对项目的处理之后,调用 task_done() 方法。
task_done() 方法用于表明之前入队的一个项目已经被处理完成。每个入队的项目对应的 task_done() 被调用一次,这样当所有项目都被处理,join() 方法可以解除阻塞。
in_queue = Queue()
def consumer():
print('Consumer waiting')
work = in_queue.get() # Done second
print('Consumer working')
# Doing work
print('Consumer done')
in_queue.task_done() # Done third
thread = Thread(target=consumer)
thread.start()
print('Producer putting')
in_queue.put(object()) # Done first
print('Producer waiting')
in_queue.join() # Done fourth
print('Producer done')
thread.join()
当你完成了对队列的操作之后,可以调用 close() 来告诉队列不会再有更多的对象被放入队列。这是一个告知队列结束通信的信号。但是,它并不会阻止已经放入队列中的项目被消费者进程提取出来。
但也可以重写 close() 方法来配置满足需求的告知结束的信号。例如下面的代码中,将 SENTINEL 对象作为结束的标识。当从 ClosableQueue 迭代出 SENTINEL 对象后就结束通信的信号。
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
return # Cause the thread to exit
yield item
finally:
self.task_done()
class StoppableWorker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
threads = [
StoppableWorker(download, download_queue, resize_queue),
StoppableWorker(resize, resize_queue, upload_queue),
StoppableWorker(upload, upload_queue, done_queue),
]
for thread in threads:
thread.start()
for _ in range(1000):
download_queue.put(object())
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')
for thread in threads:
thread.join()这个方案还可以扩展,也就是用多个线程同时处理某一个环节,以提高 I/O 并行度,从而大幅提升程序效率。
def start_threads(count, *args):
threads = [StoppableWorker(*args) for _ in range(count)]
for thread in threads:
thread.start()
return threads
def stop_threads(closable_queue, threads):
for _ in threads:
closable_queue.close()
closable_queue.join()
for thread in threads:
thread.join()
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
download_threads = start_threads(3, download, download_queue, resize_queue)
resize_threads = start_threads(4, resize, resize_queue, upload_queue)
upload_threads = start_threads(5, upload, upload_queue, done_queue)
for _ in range(1000):
download_queue.put(object())
stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)
print(done_queue.qsize(), 'items finished')什么应该做并发
假如当前我们需要实现 Conway’s Game of Life 游戏,这是一个经典的有限状态自动机。它的规则很简单:在任意长宽的二维网格中,每个单元格都必须处于 ALIVE 或 EMPTY 状态,前者表示这个单元格里有生命存在,后者表示这里没有生物。时钟每走一格,游戏就要前进一步。这个时候,我们需要考虑每个单元格的周围总共有多少个处于存活状态的单元格,并根据这个数量来决定本单元格的新状态:如果当前有生命体存在(ALIVE),那么该生命体有可能继续存活,也有可能死亡;如果单元格当前是空白的(EMPTY),那么下一步有可能继续保持空白,也有可能诞生新的生命体。
为了描述这个过程,定义了简单的容器类 Grid 来存储单元格的状态。该类提供了 get 和 set 方法来获取任何一个坐标点的值。
ALIVE = '*'
EMPTY = '-'
class Grid:
def __init__(self, height, width):
self.height = height
self.width = width
self.rows = []
for _ in range(self.height):
self.rows.append([EMPTY] * self.width)
def get(self, y, x):
return self.rows[y % self.height][x % self.width]
def set(self, y, x, state):
self.rows[y % self.height][x % self.width] = state
def __str__(self):
output = ''
for row in self.rows:
for cell in row:
output += cell
output += '\n'
return output
grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)
print(grid)
# ---*-----
# ----*----
# --***----
# ---------
# ---------假设单元格的状态变化函数 step_cell 如下。具体的逻辑是,查看该坐标四周 8 个点中处于 ALIVE 状态数。若状态数为 3,则自身处于 ALIVE 状态,否则处于 EMPTY 状态。
def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
def game_logic(state, neighbors):
if neighbors == 3:
return ALIVE
return EMPTY
def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = game_logic(state, neighbors)
set(y, x, next_state)当每个坐标的状态变化逻辑计算的较快时,上面这种串行计算各坐标新状态的方法还是可以接受的。
但是如果 step_cell 函数在执行中需要进行 I/O 操作,例如多人在线游戏需要通过 socket 通信获得各个玩家、场景的信息。那么此时串行的更新状态就会非常耗时。假如每个坐标状态的更新需要 I/O 耗时 100ms,那么 45 个节点的状态更新就需要 4.5 秒。若坐标更多,那么就需要更多的时间。这显然不适合多人在线游戏了。
如果想要降低时延,那么就需要并行的去执行状态更新逻辑,使得每批次状态更新都在 100ms。
这里在介绍两个概念。扇出,为每个工作节点分配一个执行路径。扇入,等待每个工作节点都执行完毕,再进行下一个环节。
方法一-每次扇出时创建多线程执行
首先,这些线程之间需要锁定功能对其进行协调,以确保它们所操纵的数据结构不会遭到破坏。
from threading import Lock
ALIVE = '*'
Empty = '-'
class Grid:
# ...
class LockingGrid(Grid):
def __init__(self, height, width):
super().__init__(height, width)
self.lock = Lock()
def __str__(self):
with self.lock:
return super().__str__()
def get(self, y, x):
with self.lock:
return super().get(y, x)
def set(self, y, x, state):
with self.lock:
return super().set(y, x, state)接下来,用多线程的方式来并发的对各个坐标的状态进行计算,并通过 join 函数来等待所有坐标的状态更新完成。
def simulate_threaded(grid):
next_grid = LockingGrid(grid.height, grid.width)
threads = []
for y in range(grid.height):
for x in range(grid.width):
args = (y, x, grid.get, next_grid.set)
thread = Thread(target=step_cell, args=args)
thread.start() # Fan out
threads.append(thread)
for thread in threads:
thread.join() # Fan in
return next_grid但是这样存在几个缺点:
- 通过锁机制来协调 Thread,避免破坏程序中的数据,这使得代码更加复杂。随着时间推移,可能更加难以扩展与维护。
- 线程占用的内存比较多,每条线程大约占 8MB。如果一个游戏有 1 万个单元格,那么大多数电脑就不支持了。
- 线程频繁的创建和销毁的代价大,也会耗费时间。
- 若 Thread 执行过程中抛出了异常,那么会把回溯信息(traceback)写到标准错误端(也就是 sys.stderr 里面),而不会写到标准输出端,从而难以发觉与调试。
方法二-使用 Queue 来做并发
from queue import Queue
class ClosableQueue(Queue):
...
in_queue = ClosableQueue()
out_queue = ClosableQueue()
from threading import Thread
class StoppableWorker(Thread):
...
def game_logic_thread(item):
y, x, state, neighbors = item
try:
next_state = game_logic(state, neighbors)
except Exception as e:
next_state = e
return (y, x, next_state)
# Start the threads upfront
threads = []
for _ in range(5):
thread = StoppableWorker(
game_logic_thread, in_queue, out_queue)
thread.start()
threads.append(thread)
def simulate_pipeline(grid, in_queue, out_queue):
for y in range(grid.height):
for x in range(grid.width):
state = grid.get(y, x)
neighbors = count_neighbors(y, x, grid.get)
in_queue.put((y, x, state, neighbors)) # Fan out
in_queue.join()
out_queue.close()
next_grid = Grid(grid.height, grid.width)
for item in out_queue: # Fan in
y, x, next_state = item
if isinstance(next_state, Exception):
raise SimulationError(y, x) from next_state
next_grid.set(y, x, next_state)
return next_grid新的 simulate_pipeline 函数要用到网格(Grid)类的 get 方法与 set 方法,但它这次是在主线程中调用这两个方法的,所以不用像上一条那样,专门编写一种支持多线程的网格,并通过 Lock 机制给这两个方法加锁以防网格之中的数据遭到破坏。我们这次可以直接使用普通版的 Grid 类。
上面这种机制仍然存在一定的问题:
- simulate_pipeline 函数比 simulate_threaded 更加难懂
- 需要配置 ClosableQueue、StoppableWorker,使得程序更加复杂。
- 线程并发度需要提前配置好,不能根据实际情况波动性配置。
- 为了调试,需要在工作线程中手动捕获异常,并将其放在队列中,抛给主线程。
- 最重要的是,如果需求变化,最开始只有 game_logic 函数需要 I/O 操作,现在 count_neighbors 函数也需要 I/O 操作。那么使用 Queue 搭建的管道又需要增加一个环节,将 count_neighbors 函数放到工作线程中。同时,有多条线程要使用同一张网格(Grid),还需要对 Grid 进行加锁。
方法三-使用线程池来做并发
from concurrent.futures import ThreadPoolExecutor
def simulate_pool(pool, grid):
next_grid = LockingGrid(grid.height, grid.width)
futures = []
for y in range(grid.height):
for x in range(grid.width):
args = (y, x, grid.get, next_grid.set)
future = pool.submit(step_cell, *args) # Fan out
futures.append(future)
for future in futures:
future.result() # Fan in
return next_grid
with ThreadPoolExecutor(max_workers=10) as pool:
for i in range(5):
grid = simulate_pool(pool, grid)使用线程池来进行并发,优点有:
- 不用频繁的创建和销毁线程
- 通过 submit 提交任务后,会获得 Future 实例。调用该实例的 result 方法时,会将异常抛给调用者。
使用线程池还是存在缺点,就是 I/O 并行能力不强。若 max-workers 设置为 100,也无法高效的应对一万个单元格且每个单元格都需要 I/O 操作的情况。
方法四-用协程实现高并发 I/O
协程能够制造出一种效果,让我们觉得 Python 程序好像真的可以同时执行大量任务。这种效果需要使用 async 与 await 关键字来实现,它的基本原理与生成器类似,也就是不立刻给出所有的结果,而是等需要用到的时候再一项一项地获取。
启动协程是有代价的,就是必须做一次函数调用。协程激活之后,只占用不到 1KB 内存,所以只要内存足够,协程稍微多一些也没关系。与线程类似,协程所要执行的任务也是用一个函数来表示的,在执行这个函数的过程中,协程可以从执行环境里面获取输入值,并把输出结果放到这个执行环境之中。协程与线程的区别在于,它不会把这个函数从头到尾执行完,而是每遇到一个 await 表达式,就暂停一次,下次继续执行的时候,它会先等待 await 所针对的那项 awaitable 操作有了结果(那项操作是用 async 函数表示的),然后再推进到下一个 await 表达式那里(这跟生成器函数的运作方式有点像,那种函数也是一遇到 yield 就暂停)。
协程并不会像线程那样占用大量内存,启动和切换的开销也比较小,而且不需要用复杂的代码来实现加锁或同步。这种强大的机制是通过事件循环(event loop)打造的,只要把相关的函数写对,这种循环就可以穿插着执行许多个这样的函数,并且执行得相当快,从而高效地完成并发式的 I/O 任务。
在函数定义 def 前面加上 async,就表示该函数是一个协程,这样就可以在函数里使用 await 做 I/O 了。
import asyncio
async def game_logic(state, neighbors):
# Do some input/output in here:
data = await my_socket.read(50)
async def step_cell(y, x, get, set):
state = get(y, x)
neighbors = count_neighbors(y, x, get)
next_state = await game_logic(state, neighbors)
set(y, x, next_state)
async def simulate(grid):
next_grid = Grid(grid.height, grid.width)
tasks = []
for y in range(grid.height):
for x in range(grid.width):
task = step_cell(
y, x, grid.get, next_grid.set) # Fan out
tasks.append(task)
await asyncio.gather(*tasks) # Fan in
return next_gridasync 版本的 simulate 函数,有以下几个地方需要解释:
第一,它在调用 step_cell 的时候,系统并不会立刻执行这个函数,而是会返回一个协程实例,稍后会把这个实例写在 await 表达式里面。这里的 step_cell,好比那种用 yield 写的生成器函数一样,调用时并不立刻执行它,而是返回一个生成器实例。这样就可以实现任务 fan-out(分派)模式了。
第二,内置的 asyncio 模块提供了 gather 函数,可以用来实现 fan-in(归集)模式。把 gather 写在 await 表达式里面,可以让系统用事件循环去并发地执行那些 step_cell 协程,并在全部执行完之后,再往下推进 simulate 协程。
第三,由于这些代码都是在同一条线程里执行的,因此不需要给 Grid(网格)实例加锁,至于怎样让这些 I/O 操作表现出平行的效果,则是由 asyncio 所提供的事件循环来负责的。
最后,要调整原范例之中用来推动游戏流程的那段代码。我们只需要修改一行代码,也就是把 simulate(grid)这个协程交给 asyncio.run 去运行,从而利用事件循环机制去执行推进单元格状态所需的那些 I/O 操作。
for i in range(5):
grid = asyncio.run(simulate(grid)) # Run the event loop使用协程,如果报错了,也能在控制台看到错误信息。
如果需求变了,例如 count_neighbors 函数现在也需要做 I/O 了,那么只需要在已有的代码中把这种函数声明成 async,并在调用它的那些地方添上 await 即可。
async def count_neighbors(y, x, get):
n_ = get(y - 1, x + 0) # North
ne = get(y - 1, x + 1) # Northeast
e_ = get(y + 0, x + 1) # East
se = get(y + 1, x + 1) # Southeast
s_ = get(y + 1, x + 0) # South
sw = get(y + 1, x - 1) # Southwest
w_ = get(y + 0, x - 1) # West
nw = get(y - 1, x - 1) # Northwest
neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
count = 0
for state in neighbor_states:
if state == ALIVE:
count += 1
return count
async def step_cell(y, x, get, set):
state = get(y, x)
neighbors = await count_neighbors(y, x, get)
next_state = await game_logic(state, neighbors)
set(y, x, next_state)