timeit 模块来实现简单 benchmark

import random
import timeit
 
size = 10**5
iterations = 1000
 
data = list(range(size))
to_lookup = [random.randint(0, size)  for _ in range(iterations)]
 
def run_linear(data, to_lookup):
    for index in to_lookup:
        data.index(index)
 
def run_bisect(data, to_lookup):
    for index in to_lookup:
        bisect_left(data, index)
 
baseline = timeit.timeit(
    stmt='run_linear(data, to_lookup)',
    globals=globals(),
    number=10)
print(f'Linear search takes {baseline:.6f}s')
 
comparison = timeit.timeit(
    stmt='run_bisect(data, to_lookup)',
    globals=globals(),
    number=10)
print(f'Bisect search takes {comparison:.6f}s')
 
slowdown = 1 + ((baseline - comparison) / comparison)
print(f'{slowdown:.1f}x time')

分析程序性能

Python 内置了两种 profiler 模块,可以找到程序里面占总执行时间比例最高的一部分。一种是由 profile 模块提供的纯 Python 版本,还有一种是由 cProfile 模块提供的 C 扩展版本。这个版本比纯 Python 版要好,因为它在执行测评的过程中,对受测程序的影响比较小,测评结果更加准确。相反,纯 Python 版本的开销比较大,会令测评结果有所偏差。

需要注意的是,分析 Python 程序的性能时,别把访问外部系统的那些代码,与核心代码混在一起测。例如,访问网络或磁盘资源的代码逻辑。

# 对 10**4 的数据进行插入排序
from random import randint
max_size = 10**4
data = [randint(0, max_size) for _ in range(max_size)]
test = lambda: insertion_sort(data)
 
# 使用 cProfile 模块进行性能测评
from cProfile import Profile
profiler = Profile()
profiler.runcall(test)

然后使用 Stats 模块来进行统计结果。其中,测评结果有多组计算指标。ncalls:调用次数。tottime:执行该函数本身 & 不含调用其他函数的时间。tottime percall:平均单次调用执行的时间(不含其他调用)。cumtime:执行该函数 & 调用其他函数的时间。cumtime percall:平均单次调用执行的时间(含其他调用)。

from pstats import Stats
stats = Stats(profiler)
stats.strip_dirs()  # 简化函数名称的路径显示。它移除函数的文件路径前缀,只留下函数名本身
stats.sort_stats('cumulative')  # 根据累积时间进行排序
stats.print_stats()  # 将结果输出到控制台
 
#          20003 function calls in 1.227 seconds
#
#    Ordered by: cumulative time
#
#    ncalls  tottime  percall  cumtime  percall filename:lineno(function)
#         1    0.000    0.000    1.227    1.227 <stdin>:1(<lambda>)
#         1    0.002    0.002    1.227    1.227 <stdin>:1(insertion_sort)
#     10000    1.217    0.000    1.225    0.000 <stdin>:1(insert_value)
#      9989    0.008    0.000    0.008    0.000 {method 'insert' of 'list' objects}
#        11    0.000    0.000    0.000    0.000 {method 'append' of 'list' objects}
#         1    0.000    0.000    0.000    0.000 {method 'disable' of '_lsprof.Profiler' objects}
 
stats.print_callers()  # 打印调用关系的分析结果,查看函数被谁调用
# 有时候可能一个工具函数可能被多个函数调用,通过反向查看可以了解调用情况
 
#    Ordered by: cumulative time
#
# Function                                          was called by...
#                                                       ncalls  tottime  cumtime
# <stdin>:1(<lambda>)                               <-
# <stdin>:1(insert_value)                           <-   10000    1.217    1.225  <stdin>:1(insertion_sort)
# {method 'insert' of 'list' objects}               <-    9989    0.008    0.008  <stdin>:1(insert_value)
# {method 'append' of 'list' objects}               <-      11    0.000    0.000  <stdin>:1(insert_value)
# {method 'disable' of '_lsprof.Profiler' objects}  <-

使用 deque 替代 list 实现生产者-消费者队列

from collections import deque
from threading import Thread, Lock
import time
import random
 
queue = deque()
lock = Lock()
 
# 生产者函数
def producer(name, queue, lock):
    while True:
        item = random.randint(1, 100)
        with lock:
            queue.append(item)
            print(f'{name} produced {item}')
        time.sleep(random.random())
 
# 消费者函数
def consumer(name, queue, lock):
    while True:
        with lock:
            if len(queue) != 0:
                item = queue.popleft()
                print(f'{name} consumed {item}')
        time.sleep(random.random())
 
producer_thread = Thread(target=producer, args=('Producer', queue, lock))
consumer_thread = Thread(target=consumer, args=('Consumer', queue, lock))
 
producer_thread.start()
consumer_thread.start()
time.sleep(2)
 
producer_thread.join()
consumer_thread.join()

使用 bisect 操作有序序列

使用 heapq 制作优先级队列

使用 memoryview 与 bytearray 来实零拷贝的 bytes 操作

假如现在要构建媒体服务器,通过网络把电视或电影以数据流的形式发送给用户,这样他们就不用先把整个视频文件全都下载下来才开始观看。这种系统必须实现一项重要的功能,就是允许用户直接跳到后面或前面的某个时间点,并从那里开始观看,这样的话,他们就可以跳过或者回看视频之中的某些部分了。在客户端程序里面,必须针对该功能编写 request_chunk 函数,让它根据用户所选择的时间点在整部视频中的字节位置(byte_offset),向服务器发送请求以获取相应的数据块。

def timecode_to_index(video_id, timecode):
    return 1234
    # Returns the byte offset in the video data
 
def request_chunk(video_id, byte_offset, size):
    pass
    # Returns size bytes of video_id's data from the offset

接下来就要考虑,服务器端应该怎么处理客户端通过 request_chunk 发来的请求,并返回相应的那 20MB 视频数据?为了突出重点,这里假设客户端与服务器之间的请求与响应机制,已经实现好了(必要环节参见第 61 条),这样就可以聚焦最关键的一步,也就是怎么把客户所需要的那块视频数据,从缓存在内存之中的千兆字节提取出来,并通过 socket 发回客户端。下面描述大致的流程:

video_id = ...
timecode = '01:09:14:28'
byte_offset = timecode_to_index(video_id, timecode)
size = 20 * 1024 * 1024
video_data = request_chunk(video_id, byte_offset, size)

这段代码的延迟与吞吐量,取决于两个因素:一个是从 video_data 里面切出 20 MB 视频数据需要多久;另一个是把这些数据通过 socket 发给客户端需要多久。现在假设 socket 的速度无限快,这样就可以专心考虑前一个因素了。我们通过内置的 timeit 模块做这样一个 micro-benchmark,以测评刚才那段代码的效率,看看它采用的这种方式(参见第 11 条),要花多长时间才能从 bytes 实例里面切割出制作数据块所必需的信息。

import timeit
 
def run_test():
    chunk = video_data[byte_offset:byte_offset + size]
    # Call socket.send(chunk), but ignoring for benchmark
 
result = timeit.timeit(
    stmt='run_test()',
    globals=globals(),
    number=100) / 100
 
print(f'{result:0.9f} seconds')

可以看到,从 bytes 里面提取 20 MB 数据并传给客户端,大概需要 5 毫秒。也就是说,服务器的总吞吐量是每 5 毫秒 20 MB,相当于每秒 4 GB,这就是它从内存中提取视频数据所能达到的最大速度。如果多个客户端同时请求获取各自的新数据块,那么服务器这边的 CPU 在每秒内最多只能服务 200 个客户(因为每响应一次请求,都需要耗费 5 毫秒,或者 0.005 秒的时间),这个数量与内置的 asyncio 模块所能支持的数量相比实在太小,asyncio 应该可以同时处理成千上万条连接才对。问题的关键在于,对 bytes 实例做切片需要拷贝底层数据,这会浪费 CPU 的时间。

这段代码可以通过 Python 内置的 memoryview 类型来改进,这个类型让程序能够利用 CPython 的缓冲协议(buffer protocol)高效地操纵字节数据。这套协议属于底层的 C API,允许 Python 运行时系统与 C 扩展访问底层的数据缓冲,而 bytes 等实例正是由这种数据缓冲对象所支持的。memoryview 最大的优点,是能够在不复制底层数据的前提下,制作出另一个 memoryview。下面这段代码,就先把 bytes 实例封装在 memoryview 里面,然后再切割,这样不用拷贝底层数据。

data = b'shave and a haircut, two bits'
view = memoryview(data)
chunk = view[12:19]
print(chunk)
print('Size:           ', chunk.nbytes)
print('Data in view:   ', chunk.tobytes())
print('Underlying data:', chunk.obj)
# <memory at 0x00000252E30F6AC0>
# Size:            7
# Data in view:    b'haircut'
# Underlying data: b'shave and a haircut, two bits'

由于它执行的是零拷贝操作(zero-copy operation)[1],因此对于需要高速处理大量内存数据的代码来说用处很大(比如 NumPy 这种处理数值的 C 扩展,或者像本例这样的 I/O 密集型代码)。下面,我们用 memoryview 封装早前的 video_data,然后在封装出来的 video_view 上面做切片,而不是直接切割 video_data。用同一个 micro-benchmark 来测评这种做法的效率:

video_view = memoryview(video_data)
 
def run_test():
    chunk = video_view[byte_offset:byte_offset + size]
    # Call socket.send(chunk), but ignoring for benchmark
 
result = timeit.timeit(
    stmt='run_test()',
    globals=globals(),
    number=100) / 100
 
print(f'{result:0.9f} seconds')

这次只需要 250 纳秒。所以,服务器理论上的最大吞吐量是每 250 纳秒 20 MB,相当于每秒 80 TB。服务器的 CPU 每秒钟可以处理的请求数是四百万。好处还不止这一个:由于处理时间很短,这意味着这个程序几乎不受制于 CPU 的计算能力,它的速度现在只取决于 socket 与客户端之间的底层连接速度。

假设现在需要实现双向流动,也就是允许某些客户端做视频直播并把视频数据发送给服务器,以便传送到其他用户那里。为了实现这项功能,我们可以把用户最近发来的那段视频放在缓存里面,以便其他用户读取。下面这段代码演示了怎样把客户端发来的 1 MB 新数据放置在缓存的适当位置。

class FakeSocket:
 
    def recv(self, size):
        return video_view[byte_offset:byte_offset+size]
 
    def recv_into(self, buffer):
        source_data = video_view[byte_offset:byte_offset+size]
        buffer[:] = source_data
 
socket = ...        # socket connection to the client
video_cache = ...   # Cache of incoming video stream
byte_offset = ...   # Incoming buffer position
size = 1024 * 1024  # Incoming chunk size
socket = FakeSocket()
video_cache = video_data[:]
byte_offset = 1234
 
chunk = socket.recv(size)
video_view = memoryview(video_cache)
before = video_view[:byte_offset]
after = video_view[byte_offset + size:]
new_cache = b''.join([before, chunk, after])

socket.recv 方法返回的是 bytes 实例,我们现在要把这个 bytes 所包含的字节数据(也就是 chunk)安插到缓存之中。为此,先把 video_cache 缓存封装成 video_view,这样很容易就能切割出 byte_offset 之前的那一段(before)以及新数据之后的那一段(after),最后用 bytes.join 方法把这三段拼接起来。为了观察这种写法的性能,我们再做一次 micro-benchmark,这次用一个空的 socket 来测试,这样可以免去 I/O 操作,让测试结果完全反映出内存操作的性能。

def run_test():
    chunk = socket.recv(size)
    before = video_view[:byte_offset]
    after = video_view[byte_offset + size:]
    new_cache = b''.join([before, chunk, after])
 
result = timeit.timeit(
    stmt='run_test()',
    globals=globals(),
    number=100) / 100
 
print(f'{result:0.9f} seconds')

这次需要 33 毫秒(0.033 秒)才能接收 1 MB 新数据并把它更新到视频缓存里面。这意味着最大吞吐量大约是每秒钟 31 MB,处理一次更新请求要花 0.033 秒,所以每秒钟大约只能给 31 个客户端同时提供这种视频流服务。这个方案没办法大规模运用。如果能把 bytes 改成 Python 内置的 bytearray,那么与 memoryview 相搭配,效果会比较好。bytes 有个限制,就是只能读取不能修改,我们不能单独更新其中某个位置上的字节。

如果能把 bytes 改成 Python 内置的 bytearray,那么与 memoryview 相搭配,效果会比较好。bytes 有个限制,就是只能读取不能修改,我们不能单独更新其中某个位置上的字节。而 bytearray 则相当于可修改的 bytes,它允许我们修改任意位置上面的内容。bytearray 采用整数表示其中的内容,而不像 bytes 那样,采用 b 开头的字面值。

my_array = bytearray(b'hello')
my_array[0] = 0x79
print(my_array)

而 bytearray 则相当于可修改的 bytes,它允许我们修改任意位置上面的内容。bytearray 采用整数表示其中的内容,而不像 bytes 那样,采用 b 开头的字面值。

my_array = bytearray(b'row, row, row your boat')
my_view = memoryview(my_array)
write_view = my_view[3:13]
write_view[:] = b'-10 bytes-'
print(my_array)

Python 里面很多库之中的方法,例如 socket.recv_into 与 RawIOBase.readinto,都使用缓冲协议来迅速接收或读取数据。这种方法的好处是不用分配内存,也不用给原数据制作复本,它们会把收到的内容直接写入现有的缓冲区。我们可以把通过 memoryview 所制作的切片传给 socket.recv_into,这样就能直接用该方法接收到的数据来替换底层的 bytearray 里面与切片相对应的那一部分内容。

video_array = bytearray(video_cache)
write_view = memoryview(video_array)
chunk = write_view[byte_offset:byte_offset + size]
socket.recv_into(chunk)

然后,我们再做一次 micro-benchmark,看看这种写法的效率,跟原来那种采用 socket.recv 所实现的写法相比,会不会快一些。

def run_test():
    chunk = write_view[byte_offset:byte_offset + size]
    socket.recv_into(chunk)
 
result = timeit.timeit(
    stmt='run_test()',
    globals=globals(),
    number=100) / 100
 
print(f'{result:0.9f} seconds')

这次可以看到,传输 1 MB 视频数据只需要 33 微秒(也就是 0.000033 秒)。这意味着服务器的最大吞吐量可以达到每秒钟 31 GB。服务器每秒钟能够处理的客户请求大约是 31000 次。这才是能够大规模运用的方案。