使用 Python 的 concurrent.futures 模块

Python3.2 带来了  concurrent.futures  模块,这个模块具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。

此模块由以下部分组成:

  • concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。
  • submit(function, argument): 调度函数(可调用的对象)的执行,将  argument  作为参数传入。
  • map(function, argument): 将  argument  作为参数执行函数,以  异步  的方式。
  • shutdown(Wait=True): 发出让执行者释放所有资源的信号。
  • concurrent.futures.Future: 其中包括函数的异步执行。Future 对象是 submit 任务(即带有参数的 functions)到 executor 的实例。

Executor 是抽象类,可以通过子类访问,即线程或进程的  ExecutorPools 。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的 launcher 或 executor。

current.Futures  模块提供了两种  Executor  的子类,各自独立操作一个线程池和一个进程池。这两个子类分别是:

  • concurrent.futures.ThreadPoolExecutor(max_workers)
  • concurrent.futures.ProcessPoolExecutor(max_workers)
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
 
def worker(x):
    # 这里是你的任务...
    return x * x
 
# 创建一个线程池
with ThreadPoolExecutor(max_workers=4) as executor:
    future = executor.submit(worker, 2)  # 提交任务
    print(future.result())  # 获取任务结果
 
# 创建一个进程池
with ProcessPoolExecutor(max_workers=4) as executor:
    future = executor.submit(worker, 2)  # 提交任务
    print(future.result())  # 获取任务结果

ThreadPool 与 ThreadPoolExecutor

Python 提供了两种基于线程的工作池,分别是 multiprocessing.pool.ThreadPool 类和 concurrent.futures.ThreadPoolExecutor 类。

multiprocessing.pool.ThreadPoolmultiprocessing.Pool API 的副本,它使用线程而不是进程,这可能导致一些奇怪的情况,因为线程和进程非常不同,包括返回一个只有它自己能理解的 AsyncResult 类型。

concurrent.futures.ThreadPoolExecutorconcurrent.futures.Executor 的子类,这是一个较新的、简化的 API,专门用于线程的创建和管理。它提供了一种更简洁、更易于使用的方式,使得你能够更方便地在多个线程之间切换和调度任务。

总的来说,concurrent.futures.ThreadPoolExecutor 在使用上更为简洁和直观,且是 Python 官方推荐的多线程处理方式。如果你需要在 Python 中实现多线程编程,concurrent.futures.ThreadPoolExecutor 可能是更好的选择。

参考链接:

使用 Asyncio 管理事件循环

Python 的 Asyncio 模块提供了管理事件、协程、任务和线程的方法,以及编写并发代码的原语。此模块的主要组件和概念包括:

  • 事件循环: 在 Asyncio 模块中,每一个进程都有一个事件循环。
  • 协程: 这是子程序的泛化概念。协程可以在执行期间暂停,这样就可以等待外部的处理(例如 IO)完成之后,从之前暂停的地方恢复执行。
  • Futures: 定义了  Future  对象,和  concurrent.futures  模块一样,表示尚未完成的计算。
  • Tasks: 这是 Asyncio 的子类,用于封装和管理并行模式下的协程。

异步编程的上下文中,事件无比重要。因为事件的本质就是异步。

Asyncio 提供了一下方法来管理事件循环:

  • loop = get_event_loop(): 得到当前上下文的事件循环。
  • loop.call_later(time_delay, callback, argument): 延后  time_delay  秒再执行  callback  方法。
  • loop.call_soon(callback, argument): 尽可能快调用  callbackcall_soon()  函数结束,主线程回到事件循环之后就会马上调用  callback 。
  • loop.time(): 以 float 类型返回当前时间循环的内部时间。
  • asyncio.set_event_loop(): 为当前上下文设置事件循环。
  • asyncio.new_event_loop(): 根据此策略创建一个新的时间循环并返回。
  • loop.run_forever(): 在调用  stop()  之前将一直运行。
import asyncio
import datetime
import time
 
def function_1(end_time, loop):
    print ("function_1 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_2, end_time, loop)
    else:
        loop.stop()
 
def function_2(end_time, loop):
    print ("function_2 called ")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_3, end_time, loop)
    else:
        loop.stop()
 
def function_3(end_time, loop):
    print ("function_3 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_1, end_time, loop)
    else:
        loop.stop()
 
def function_4(end_time, loop):
    print ("function_5 called")
    if (loop.time() + 1.0) < end_time:
        loop.call_later(1, function_4, end_time, loop)
    else:
        loop.stop()
 
loop = asyncio.get_event_loop()
 
end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
# loop.call_soon(function_4, end_loop, loop)
loop.run_forever()
loop.close()

在 Python 3.7 开始,对事件进行了简化

import asyncio
 
async def main():
    print('Hello')
    await asyncio.sleep(1)
    print('World')
 
# Python 3.7+
asyncio.run(main())

在这个例子中,main 函数是一个协程,我们使用 asyncio.run(main()) 来运行它。asyncio.run() 会创建一个事件循环,运行 main 协程,然后在 main 协程完成后关闭事件循环。

一般来说,应用程序开发人员应该使用这样的高级 asyncio 函数,很少需要直接引用事件循环对象或调用其方法。

例如,上面的代码可以更换为:

import asyncio
 
async def function_1(end_time):
    print("function_1 called")
    if asyncio.get_running_loop().time() + 1.0 < end_time:
        await asyncio.sleep(1)
        await function_2(end_time)
    else:
        return
 
async def function_2(end_time):
    print("function_2 called")
    if asyncio.get_running_loop().time() + 1.0 < end_time:
        await asyncio.sleep(1)
        await function_3(end_time)
    else:
        return
 
async def function_3(end_time):
    print("function_3 called")
    if asyncio.get_running_loop().time() + 1.0 < end_time:
        await asyncio.sleep(1)
        await function_1(end_time)
    else:
        return
 
async def main():
    end_time = asyncio.get_running_loop().time() + 9.0
    await function_1(end_time)
 
# Python 3.7+
asyncio.run(main())

使用 Asyncio 管理协程

在上文提到的例子中,我们看到当一个程序变得很大而且复杂时,将其划分为子程序,每一部分实现特定的任务是个不错的方案。子程序不能单独执行,只能在主程序的请求下执行,主程序负责协调使用各个子程序。协程就是子程序的泛化。和子程序一样的事,协程只负责计算任务的一步;和子程序不一样的是,协程没有主程序来进行调度。这是因为协程通过管道连接在一起,没有监视函数负责顺序调用它们。在协程中,执行点可以被挂起,可以被从之前挂起的点恢复执行。

使用 Asyncio 定义协程非常简单,只需要一个装饰器即可,并在需要转交执行权的地方使用 yield 关键字。

import asyncio
 
@asyncio.coroutine
def coroutine_function(function_arguments):
    # DO_SOMETHING
    result = yield from func(input_value)

在 Python 3.7 后有了新的语法,即,使用 async def 替换了 @asyncio.coroutine,并使用 await 关键字替换了 yield from

import asyncio
 
async def coroutine_function(function_arguments):
    # DO_SOMETHING
    result = await func(input_value)

使用 Asyncio 控制任务

在 Python 中,asyncio 模块提供了很多工具来帮助你控制异步任务。在同一事件循环中,运行某一个任务的同时可以并发地运行多个任务。当协程被包在任务中,它会自动将任务和事件循环连接起来,当事件循环启动的时候,任务自动运行。这样就提供了一个可以自动驱动协程的机制。以下是一些主要的方法:

asyncio.create_task(coro, *, name=None): 这个函数会安排协程的执行,并返回一个 Task 对象。Task 对象代表了一个在事件循环中执行的协程。

async def say_hello():
    await asyncio.sleep(1)
    print('Hello')
 
task = asyncio.create_task(say_hello())

Task.cancel(): 这个方法可以请求取消任务的执行。任务会在下一个取消点被取消,取消点通常是 await 表达式或者 yield from 表达式。

task = asyncio.create_task(say_hello())
task.cancel()

Task.result(): 这个方法可以获取任务的结果。如果任务已经完成,这个方法会返回任务的结果,如果任务被取消,这个方法会抛出一个 CancelledError 异常。

task = asyncio.create_task(say_hello())
result = task.result()

等待一组任务完成:

import asyncio
 
async def my_coroutine(seconds):
    print('This coroutine will sleep for', seconds, 'seconds')
    await asyncio.sleep(seconds)
 
async def main():
    task1 = asyncio.create_task(my_coroutine(3))
    task2 = asyncio.create_task(my_coroutine(2))
    await asyncio.gather(task1, task2)  # 等待一组任务完成
 
asyncio.run(main())

使用 Asyncio 和 Futures

在 Python 中,asyncio.Future 类代表了异步操作的最终结果。Future 是一个可等待的对象,协程可以等待 Future 对象,直到它们有一个结果或一个异常设置,或者直到它们被取消。

这是一个简单的示例,展示了如何使用 asyncio.Future

import asyncio
 
async def main(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')
 
loop = asyncio.get_event_loop()
future = loop.create_future()
loop.run_until_complete(main(future))
print(future.result())

在这个例子中,我们首先创建了一个 Future 对象,然后在 main 协程中等待一秒钟,然后设置 Future 的结果。最后,我们打印出 Future 的结果,它应该是”Future is done!”。

另一种方式:

import asyncio
 
async def set_future_result(future):
    await asyncio.sleep(1)
    future.set_result('Future is done!')
 
async def main():
    future = asyncio.Future()
    await set_future_result(future)
    print(future.result())
 
asyncio.run(main())

Future 的基本方法有:

  • cancel(): 取消 future 的执行,调度回调函数
  • result(): 返回 future 代表的结果
  • exception(): 返回 future 中的 Exception
  • add_done_callback(fn): 添加一个回调函数,当 future 执行的时候会调用这个回调函数
  • remove_done_callback(fn): 从“call whten done”列表中移除所有 callback 的实例
  • set_result(result): 将 future 标为执行完成,并且设置 result 的值
  • set_exception(exception): 将 future 标为执行完成,并设置 Exception

参考链接:

Backlinks:

LIST FROM [[]]