新闻中心

Python并发编程:asyncio与threading协同实现同步任务并行化

2025-10-30
浏览次数:
返回列表

Python并发编程:asyncio与threading协同实现同步任务并行化

本文探讨python中如何将同步阻塞函数与异步协程任务并行执行。通过分析`asyncio`事件循环的特性,我们揭示了直接调用同步函数会阻塞事件循环的问题。核心解决方案是利用`asyncio.run_in_executor`将同步任务提交到独立的线程池中执行,从而实现与异步任务的并发运行,有效提升应用程序的响应性和吞吐量,尤其适用于处理i/o密集型或cpu密集型同步操作。

理解asyncio中的并发与阻塞

在Python的asyncio框架中,协程(coroutine)是实现并发的主要机制。一个asyncio事件循环在单线程中运行,通过协作式多任务处理(cooperative multitasking)来实现并发。这意味着当一个协程遇到await表达式时,它会暂停执行并将控制权交还给事件循环,允许其他协程运行。然而,如果一个协程内部调用了一个耗时且不包含await的同步阻塞函数,那么整个事件循环将会被阻塞,直到该同步函数执行完毕,其他所有协程都无法运行。

考虑以下原始代码示例:

import asyncio
import threading

def do_sync(number: int) -> int:
    # 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
    lock = threading.Lock()
    lock.acquire()
    print(f"do_sync: 开始处理 {number}")
    result = number + 10
    print(f"do_sync: 完成处理 {number}")
    lock.release()
    return result

async def do_async(number: int) -> int:
    print(f"do_async: 开始处理 {number}")
    # 模拟异步I/O操作
    await asyncio.sleep(0.1) 
    result = number + 10
    print(f"do_async: 完成处理 {number}")
    return result

async def main():
    # 这里的max函数会先完全执行do_sync(1),然后才await do_async(2)
    # 它们是顺序执行的,不是并行
    result = max(do_sync(1), await do_async(2))
    print(f"最终结果: {result}")

if __name__ == "__main__":
    asyncio.run(main())

在这段代码中,main函数内的max(do_sync(1), await do_async(2))表达式会首先完全执行do_sync(1)。由于do_sync是一个普通的同步函数,它会立即运行并阻塞当前线程,直到其返回结果。只有当do_sync(1)执行完毕后,await do_async(2)才会被调度执行。因此,这两个函数是顺序执行的,并未实现真正的并发。

利用 asyncio.run_in_executor 实现同步任务并行化

为了让同步阻塞函数与异步协程任务并行运行,我们需要将同步函数从asyncio事件循环的主线程中“卸载”到另一个独立的线程或进程中执行。asyncio为此提供了loop.run_in_executor方法。

run_in_executor允许我们将一个普通的同步函数提交到一个执行器(executor)中运行。默认情况下,asyncio使用ThreadPoolExecutor作为执行器,这意味着同步函数将在一个单独的线程中运行。这样,asyncio事件循环可以继续处理其他协程任务,而无需等待同步函数完成。

run_in_executor的基本用法如下:

await loop.run_in_executor(executor, func, *args)
  • executor: 可以是一个concurrent.futures.ThreadPoolExecutor或concurrent.futures.ProcessPoolExecutor实例,或者None(默认使用ThreadPoolExecutor)。
  • func: 要在执行器中运行的同步函数。
  • *args: 传递给func的参数。

run_in_executor会返回一个Future对象,我们可以await这个Future来等待同步函数的执行结果。

示例代码与解析

现在,我们使用run_in_executor来改造main函数,使do_sync与do_async能够并行运行:

Pinokio Pinokio

Pinokio是一款开源的AI浏览器,可以安装运行各种AI模型和应用

Pinokio 232 查看详情 Pinokio
import asyncio
import threading
import time # 引入time模块用于模拟耗时操作

def do_sync(number: int) -> int:
    # 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
    # 在run_in_executor场景下,如果do_sync内部需要线程安全,则仍需加锁
    # lock = threading.Lock() # 每次调用都会创建新锁,这里不合适,应在外部管理或无需
    print(f"do_sync: 开始处理 {number} 在线程 {threading.current_thread().name}")
    time.sleep(1) # 模拟一个耗时1秒的同步操作
    result = number + 10
    print(f"do_sync: 完成处理 {number} 在线程 {threading.current_thread().name}")
    return result

async def do_async(number: int) -> int:
    print(f"do_async: 开始处理 {number} 在线程 {threading.current_thread().name}")
    await asyncio.sleep(0.5) # 模拟一个耗时0.5秒的异步I/O操作
    result = number + 10
    print(f"do_async: 完成处理 {number} 在线程 {threading.current_thread().name}")
    return result

async def main():
    loop = asyncio.get_running_loop()

    # 使用run_in_executor将do_sync提交到线程池执行
    # 注意:do_sync_future是一个Future对象,它代表do_sync的未来结果
    do_sync_future = loop.run_in_executor(None, do_sync, 1)

    # do_async是一个awaitable对象
    do_async_task = do_async(2)

    # 使用asyncio.gather等待这两个任务并行完成
    # asyncio.gather会等待所有给定的awaitable对象完成
    sync_result, async_result = await asyncio.gather(do_sync_future, do_async_task)

    final_max_result = max(sync_result, async_result)
    print(f"同步任务结果: {sync_result}")
    print(f"异步任务结果: {async_result}")
    print(f"最终最大结果: {final_max_result}")

if __name__ == "__main__":
    start_time = time.monotonic()
    asyncio.run(main())
    end_time = time.monotonic()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

代码解析:

  1. 获取事件循环: loop = asyncio.get_running_loop() 获取当前正在运行的事件循环实例。
  2. 提交同步任务: do_sync_future = loop.run_in_executor(None, do_sync, 1) 将do_sync(1)函数提交到默认的线程池中执行。None表示使用默认的ThreadPoolExecutor。run_in_executor立即返回一个Future对象,而不是阻塞。
  3. 创建异步任务: do_async_task = do_async(2) 创建一个协程对象,它是一个可等待(awaitable)对象。
  4. 并行等待: await asyncio.gather(do_sync_future, do_async_task) 是关键。asyncio.gather允许我们同时等待多个可等待对象。在这种情况下,do_sync_future会在一个单独的线程中执行,而do_async_task则在事件循环的主线程中作为协程执行。它们会并行地运行,等待时间由最长的那个任务决定。
  5. 结果处理: 当asyncio.gather返回时,它会按顺序返回各个任务的结果。

运行上述代码,你会观察到do_sync和do_async的打印输出几乎同时开始,并且总耗时约为1秒(取决于do_sync的time.sleep时间),而不是1.5秒(1秒 + 0.5秒),这证明了它们是并行执行的。

注意事项

  1. 选择合适的执行器:

    • ThreadPoolExecutor (默认): 适用于I/O密集型同步任务(如网络请求、文件读写)。它将阻塞操作放到单独的线程中,释放GIL,允许Python解释器在其他线程中运行。
    • ProcessPoolExecutor: 适用于CPU密集型同步任务。它将任务放到单独的进程中,可以绕过GIL的限制,真正实现CPU层面的并行计算。使用时需要注意进程间通信的开销以及数据序列化问题。
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    
    # 自定义线程池
    with ThreadPoolExecutor(max_workers=5) as thread_pool:
        do_sync_future = loop.run_in_executor(thread_pool, do_sync, 1)
    
    # 自定义进程池
    with ProcessPoolExecutor(max_workers=2) as process_pool:
        do_sync_future = loop.run_in_executor(process_pool, do_sync_cpu_bound, 1)
  2. 共享状态与线程安全: 如果提交给run_in_executor的同步函数需要访问或修改共享数据(如全局变量、类属性),则必须使用线程锁(threading.Lock)或其他同步原语来确保线程安全,以避免数据竞争问题。在示例中的do_sync函数,由于其内部操作是独立的,不涉及共享状态,因此无需外部加锁。

  3. 错误处理: run_in_executor返回的Future对象在被await时,如果其内部的同步函数抛出异常,该异常会被重新抛出。因此,可以使用try...except块来捕获和处理这些异常。

  4. 避免过度使用: run_in_executor引入了线程/进程管理的开销。对于非常轻量级的同步操作,直接在事件循环中执行可能比使用执行器更高效。它主要用于处理那些确实会阻塞事件循环的耗时同步任务。

总结

通过asyncio.run_in_executor,Python的asyncio框架能够优雅地与传统的同步阻塞代码以及多线程/多进程模型结合。它提供了一种强大的机制,允许开发者在保持事件循环响应性的同时,处理耗时的同步I/O或CPU密集型任务。掌握这一技术对于构建高性能、高并发的Python应用程序至关重要,它弥合了协程的非阻塞特性与传统同步操作之间的鸿沟,使得Python开发者能够充分利用现代多核处理器和异步编程范式。

以上就是Python并发编程:asyncio与threading协同实现同步任务并行化的详细内容,更多请关注其它相关文章!


# 这两个  # 优化ppt的ai网站一键优化  # 龙江网站推广怎么样做  # 丹东网站建设平台  # 无锡朗云网站建设  # 拿到一个网站如何优化  # 初学者必学的seo  # 玉溪网站建设排名  # seo排名立联火星  # 云南seo软件优点  # 随州seo外包获客系统  # 重写  # 全局变量  # python  # 多线程  # 执行器  # 它会  # 加锁  # 适用于  # 自定义  # 是一个  # 异步协程  # 异步任务  # 并发编程  # ai  # 处理器 


相关栏目: 【 科技资讯46185 】 【 网络学院92790


相关推荐: 如何在 Windows 11 中启动游戏手柄设置  MAC怎么安装Homebrew包管理器_MAC为开发者和高级用户安装命令行工具  淘宝支付提示失败如何解决 淘宝支付流程优化方法  如何为你的Composer包编写自动化测试_集成PHPUnit到Composer的scripts工作流  CSS布局:解决全屏元素100%尺寸与外边距导致的页面溢出问题  Win11怎么关闭快速启动_Win11彻底关机设置教程  C++如何进行游戏物理模拟_使用Box2D库为C++游戏添加2D物理效果  Safari自带网页翻译功能怎么用 无需插件轻松看懂外文网站【方法】  如何使 Jest 模拟函数默认抛出错误以提高测试效率  Adobe PDF表单中利用J*aScript解析与格式化日期组件的教程  限制HTML日期输入框的日期选择范围  漫蛙MANWA漫画主页官方入口 漫蛙漫画最新在线阅读地址  Win10怎么制作U盘启动盘 Win10系统安装U盘制作教程【详解】  解决Tabulator日期时间排序问题的专业指南  Composer的 archive 命令怎么用_快速打包你的PHP项目及其Composer依赖  Win11怎么用U盘重装系统 Win11制作启动盘并重装系统完整教程【详解】  解决Django多数据库/多Schema环境下外键迁移问题  现代化 SciPy 一维插值:interp1d 的替代方案与最佳实践  J*a最大堆Heapify方法修复:索引计算与边界条件深度解析  J*aScript 字符串标签转换:使用正则表达式高效替换  荣耀Play7T运行卡顿解决_荣耀Play7T性能优化  高德地图沿途添加点失败如何解决 高德多点规划方法  天猫双十一预售商品怎么退款_天猫双十一预售退款操作指南  C++指针和引用有什么区别_C++内存管理核心概念深度解析  KFC游戏互动怎么赢取优惠券_KFC线上游戏活动参与与优惠代码赢取教程  使用J*aScript检测输入元素是否包含在特定类中  Django模型中自动计算可用余额的实现方法  J*aScript打印功能_j*ascript输出控制  Lar*el表单中优雅地处理“返回”按钮以规避验证:最佳实践指南  PHP表单数据传递:如何通过隐藏输入字段获取动态ID  Golang如何实现微服务鉴权与权限控制_Golang微服务鉴权与权限管理实践  特斯拉自动驾驶房车计划曝光 原型车将于2027年亮相  苹果手机如何防止被恶意App追踪  KFC套餐升级怎么获取优惠代码_KFC套餐升级活动与优惠代码获取方法  steam官方网页快速访问 steam账号注册全流程  Python大型XML文件高效流式解析教程  飞书妙记怎样用语音转文字速记_飞书妙记用语音转文字速记【速记方法】  MAC的“快捷指令”怎么同步到iPhone_MAC利用iCloud同步所有设备的自动化指令  拷贝漫画电脑版官网入口 拷贝漫画(PC版)在线直达  PPT平滑切换怎么做 PPT炫酷“平滑”切换动画制作教程【必学】  使用Python高效删除Word宏并转换DOCM为DOCX格式  Golang如何处理RPC请求负载均衡_Golang RPC请求负载均衡策略与实践  2025年云电脑操作系统体验 | 无需本地硬件,随时随地使用高性能PC  Win11怎么开启省电模式_Win11电池节电模式自动开启  “音游” × “怪文书” 题材的节奏冒险游戏 《晕晕电波症候群》确定于2026年4月发售!  京东京造J1和网易云音乐氧气真无线有什么不同_国产电商蓝牙耳机音质对比  Win11怎么关闭触摸屏_Windows 11禁用HID符合标准触摸屏  文本文档写html代码怎么运行_文本文档html代码运行步骤【教程】  腾讯视频怎么使用多账号家庭管理_腾讯视频家庭多账号统一管理与权限分配教程  J*a里如何使用forEach遍历Map_Map遍历方法说明 

搜索