timeit 模块来实现简单 benchmark
import random
import timeit
size = 10**5
iterations = 1000
data = list(range(size))
to_lookup = [random.randint(0, size) for _ in range(iterations)]
def run_linear(data, to_lookup):
for index in to_lookup:
data.index(index)
def run_bisect(data, to_lookup):
for index in to_lookup:
bisect_left(data, index)
baseline = timeit.timeit(
stmt='run_linear(data, to_lookup)',
globals=globals(),
number=10)
print(f'Linear search takes {baseline:.6f}s')
comparison = timeit.timeit(
stmt='run_bisect(data, to_lookup)',
globals=globals(),
number=10)
print(f'Bisect search takes {comparison:.6f}s')
slowdown = 1 + ((baseline - comparison) / comparison)
print(f'{slowdown:.1f}x time')分析程序性能
Python 内置了两种 profiler 模块,可以找到程序里面占总执行时间比例最高的一部分。一种是由 profile 模块提供的纯 Python 版本,还有一种是由 cProfile 模块提供的 C 扩展版本。这个版本比纯 Python 版要好,因为它在执行测评的过程中,对受测程序的影响比较小,测评结果更加准确。相反,纯 Python 版本的开销比较大,会令测评结果有所偏差。
需要注意的是,分析 Python 程序的性能时,别把访问外部系统的那些代码,与核心代码混在一起测。例如,访问网络或磁盘资源的代码逻辑。
# 对 10**4 的数据进行插入排序
from random import randint
max_size = 10**4
data = [randint(0, max_size) for _ in range(max_size)]
test = lambda: insertion_sort(data)
# 使用 cProfile 模块进行性能测评
from cProfile import Profile
profiler = Profile()
profiler.runcall(test)然后使用 Stats 模块来进行统计结果。其中,测评结果有多组计算指标。ncalls:调用次数。tottime:执行该函数本身 & 不含调用其他函数的时间。tottime percall:平均单次调用执行的时间(不含其他调用)。cumtime:执行该函数 & 调用其他函数的时间。cumtime percall:平均单次调用执行的时间(含其他调用)。
from pstats import Stats
stats = Stats(profiler)
stats.strip_dirs() # 简化函数名称的路径显示。它移除函数的文件路径前缀,只留下函数名本身
stats.sort_stats('cumulative') # 根据累积时间进行排序
stats.print_stats() # 将结果输出到控制台
# 20003 function calls in 1.227 seconds
#
# Ordered by: cumulative time
#
# ncalls tottime percall cumtime percall filename:lineno(function)
# 1 0.000 0.000 1.227 1.227 <stdin>:1(<lambda>)
# 1 0.002 0.002 1.227 1.227 <stdin>:1(insertion_sort)
# 10000 1.217 0.000 1.225 0.000 <stdin>:1(insert_value)
# 9989 0.008 0.000 0.008 0.000 {method 'insert' of 'list' objects}
# 11 0.000 0.000 0.000 0.000 {method 'append' of 'list' objects}
# 1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
stats.print_callers() # 打印调用关系的分析结果,查看函数被谁调用
# 有时候可能一个工具函数可能被多个函数调用,通过反向查看可以了解调用情况
# Ordered by: cumulative time
#
# Function was called by...
# ncalls tottime cumtime
# <stdin>:1(<lambda>) <-
# <stdin>:1(insert_value) <- 10000 1.217 1.225 <stdin>:1(insertion_sort)
# {method 'insert' of 'list' objects} <- 9989 0.008 0.008 <stdin>:1(insert_value)
# {method 'append' of 'list' objects} <- 11 0.000 0.000 <stdin>:1(insert_value)
# {method 'disable' of '_lsprof.Profiler' objects} <-使用 deque 替代 list 实现生产者-消费者队列
from collections import deque
from threading import Thread, Lock
import time
import random
queue = deque()
lock = Lock()
# 生产者函数
def producer(name, queue, lock):
while True:
item = random.randint(1, 100)
with lock:
queue.append(item)
print(f'{name} produced {item}')
time.sleep(random.random())
# 消费者函数
def consumer(name, queue, lock):
while True:
with lock:
if len(queue) != 0:
item = queue.popleft()
print(f'{name} consumed {item}')
time.sleep(random.random())
producer_thread = Thread(target=producer, args=('Producer', queue, lock))
consumer_thread = Thread(target=consumer, args=('Consumer', queue, lock))
producer_thread.start()
consumer_thread.start()
time.sleep(2)
producer_thread.join()
consumer_thread.join()使用 bisect 操作有序序列
使用 heapq 制作优先级队列
使用 memoryview 与 bytearray 来实零拷贝的 bytes 操作
假如现在要构建媒体服务器,通过网络把电视或电影以数据流的形式发送给用户,这样他们就不用先把整个视频文件全都下载下来才开始观看。这种系统必须实现一项重要的功能,就是允许用户直接跳到后面或前面的某个时间点,并从那里开始观看,这样的话,他们就可以跳过或者回看视频之中的某些部分了。在客户端程序里面,必须针对该功能编写 request_chunk 函数,让它根据用户所选择的时间点在整部视频中的字节位置(byte_offset),向服务器发送请求以获取相应的数据块。
def timecode_to_index(video_id, timecode):
return 1234
# Returns the byte offset in the video data
def request_chunk(video_id, byte_offset, size):
pass
# Returns size bytes of video_id's data from the offset接下来就要考虑,服务器端应该怎么处理客户端通过 request_chunk 发来的请求,并返回相应的那 20MB 视频数据?为了突出重点,这里假设客户端与服务器之间的请求与响应机制,已经实现好了(必要环节参见第 61 条),这样就可以聚焦最关键的一步,也就是怎么把客户所需要的那块视频数据,从缓存在内存之中的千兆字节提取出来,并通过 socket 发回客户端。下面描述大致的流程:
video_id = ...
timecode = '01:09:14:28'
byte_offset = timecode_to_index(video_id, timecode)
size = 20 * 1024 * 1024
video_data = request_chunk(video_id, byte_offset, size)这段代码的延迟与吞吐量,取决于两个因素:一个是从 video_data 里面切出 20 MB 视频数据需要多久;另一个是把这些数据通过 socket 发给客户端需要多久。现在假设 socket 的速度无限快,这样就可以专心考虑前一个因素了。我们通过内置的 timeit 模块做这样一个 micro-benchmark,以测评刚才那段代码的效率,看看它采用的这种方式(参见第 11 条),要花多长时间才能从 bytes 实例里面切割出制作数据块所必需的信息。
import timeit
def run_test():
chunk = video_data[byte_offset:byte_offset + size]
# Call socket.send(chunk), but ignoring for benchmark
result = timeit.timeit(
stmt='run_test()',
globals=globals(),
number=100) / 100
print(f'{result:0.9f} seconds')可以看到,从 bytes 里面提取 20 MB 数据并传给客户端,大概需要 5 毫秒。也就是说,服务器的总吞吐量是每 5 毫秒 20 MB,相当于每秒 4 GB,这就是它从内存中提取视频数据所能达到的最大速度。如果多个客户端同时请求获取各自的新数据块,那么服务器这边的 CPU 在每秒内最多只能服务 200 个客户(因为每响应一次请求,都需要耗费 5 毫秒,或者 0.005 秒的时间),这个数量与内置的 asyncio 模块所能支持的数量相比实在太小,asyncio 应该可以同时处理成千上万条连接才对。问题的关键在于,对 bytes 实例做切片需要拷贝底层数据,这会浪费 CPU 的时间。
这段代码可以通过 Python 内置的 memoryview 类型来改进,这个类型让程序能够利用 CPython 的缓冲协议(buffer protocol)高效地操纵字节数据。这套协议属于底层的 C API,允许 Python 运行时系统与 C 扩展访问底层的数据缓冲,而 bytes 等实例正是由这种数据缓冲对象所支持的。memoryview 最大的优点,是能够在不复制底层数据的前提下,制作出另一个 memoryview。下面这段代码,就先把 bytes 实例封装在 memoryview 里面,然后再切割,这样不用拷贝底层数据。
data = b'shave and a haircut, two bits'
view = memoryview(data)
chunk = view[12:19]
print(chunk)
print('Size: ', chunk.nbytes)
print('Data in view: ', chunk.tobytes())
print('Underlying data:', chunk.obj)
# <memory at 0x00000252E30F6AC0>
# Size: 7
# Data in view: b'haircut'
# Underlying data: b'shave and a haircut, two bits'由于它执行的是零拷贝操作(zero-copy operation)[1],因此对于需要高速处理大量内存数据的代码来说用处很大(比如 NumPy 这种处理数值的 C 扩展,或者像本例这样的 I/O 密集型代码)。下面,我们用 memoryview 封装早前的 video_data,然后在封装出来的 video_view 上面做切片,而不是直接切割 video_data。用同一个 micro-benchmark 来测评这种做法的效率:
video_view = memoryview(video_data)
def run_test():
chunk = video_view[byte_offset:byte_offset + size]
# Call socket.send(chunk), but ignoring for benchmark
result = timeit.timeit(
stmt='run_test()',
globals=globals(),
number=100) / 100
print(f'{result:0.9f} seconds')这次只需要 250 纳秒。所以,服务器理论上的最大吞吐量是每 250 纳秒 20 MB,相当于每秒 80 TB。服务器的 CPU 每秒钟可以处理的请求数是四百万。好处还不止这一个:由于处理时间很短,这意味着这个程序几乎不受制于 CPU 的计算能力,它的速度现在只取决于 socket 与客户端之间的底层连接速度。
假设现在需要实现双向流动,也就是允许某些客户端做视频直播并把视频数据发送给服务器,以便传送到其他用户那里。为了实现这项功能,我们可以把用户最近发来的那段视频放在缓存里面,以便其他用户读取。下面这段代码演示了怎样把客户端发来的 1 MB 新数据放置在缓存的适当位置。
class FakeSocket:
def recv(self, size):
return video_view[byte_offset:byte_offset+size]
def recv_into(self, buffer):
source_data = video_view[byte_offset:byte_offset+size]
buffer[:] = source_data
socket = ... # socket connection to the client
video_cache = ... # Cache of incoming video stream
byte_offset = ... # Incoming buffer position
size = 1024 * 1024 # Incoming chunk size
socket = FakeSocket()
video_cache = video_data[:]
byte_offset = 1234
chunk = socket.recv(size)
video_view = memoryview(video_cache)
before = video_view[:byte_offset]
after = video_view[byte_offset + size:]
new_cache = b''.join([before, chunk, after])socket.recv 方法返回的是 bytes 实例,我们现在要把这个 bytes 所包含的字节数据(也就是 chunk)安插到缓存之中。为此,先把 video_cache 缓存封装成 video_view,这样很容易就能切割出 byte_offset 之前的那一段(before)以及新数据之后的那一段(after),最后用 bytes.join 方法把这三段拼接起来。为了观察这种写法的性能,我们再做一次 micro-benchmark,这次用一个空的 socket 来测试,这样可以免去 I/O 操作,让测试结果完全反映出内存操作的性能。
def run_test():
chunk = socket.recv(size)
before = video_view[:byte_offset]
after = video_view[byte_offset + size:]
new_cache = b''.join([before, chunk, after])
result = timeit.timeit(
stmt='run_test()',
globals=globals(),
number=100) / 100
print(f'{result:0.9f} seconds')这次需要 33 毫秒(0.033 秒)才能接收 1 MB 新数据并把它更新到视频缓存里面。这意味着最大吞吐量大约是每秒钟 31 MB,处理一次更新请求要花 0.033 秒,所以每秒钟大约只能给 31 个客户端同时提供这种视频流服务。这个方案没办法大规模运用。如果能把 bytes 改成 Python 内置的 bytearray,那么与 memoryview 相搭配,效果会比较好。bytes 有个限制,就是只能读取不能修改,我们不能单独更新其中某个位置上的字节。
如果能把 bytes 改成 Python 内置的 bytearray,那么与 memoryview 相搭配,效果会比较好。bytes 有个限制,就是只能读取不能修改,我们不能单独更新其中某个位置上的字节。而 bytearray 则相当于可修改的 bytes,它允许我们修改任意位置上面的内容。bytearray 采用整数表示其中的内容,而不像 bytes 那样,采用 b 开头的字面值。
my_array = bytearray(b'hello')
my_array[0] = 0x79
print(my_array)而 bytearray 则相当于可修改的 bytes,它允许我们修改任意位置上面的内容。bytearray 采用整数表示其中的内容,而不像 bytes 那样,采用 b 开头的字面值。
my_array = bytearray(b'row, row, row your boat')
my_view = memoryview(my_array)
write_view = my_view[3:13]
write_view[:] = b'-10 bytes-'
print(my_array)Python 里面很多库之中的方法,例如 socket.recv_into 与 RawIOBase.readinto,都使用缓冲协议来迅速接收或读取数据。这种方法的好处是不用分配内存,也不用给原数据制作复本,它们会把收到的内容直接写入现有的缓冲区。我们可以把通过 memoryview 所制作的切片传给 socket.recv_into,这样就能直接用该方法接收到的数据来替换底层的 bytearray 里面与切片相对应的那一部分内容。
video_array = bytearray(video_cache)
write_view = memoryview(video_array)
chunk = write_view[byte_offset:byte_offset + size]
socket.recv_into(chunk)然后,我们再做一次 micro-benchmark,看看这种写法的效率,跟原来那种采用 socket.recv 所实现的写法相比,会不会快一些。
def run_test():
chunk = write_view[byte_offset:byte_offset + size]
socket.recv_into(chunk)
result = timeit.timeit(
stmt='run_test()',
globals=globals(),
number=100) / 100
print(f'{result:0.9f} seconds')这次可以看到,传输 1 MB 视频数据只需要 33 微秒(也就是 0.000033 秒)。这意味着服务器的最大吞吐量可以达到每秒钟 31 GB。服务器每秒钟能够处理的客户请求大约是 31000 次。这才是能够大规模运用的方案。