新闻中心

Python中同步与异步函数的并发执行:结合asyncio与threading

2025-10-30
浏览次数:
返回列表

Python中同步与异步函数的并发执行:结合asyncio与threading

本文深入探讨了python中同步(阻塞)函数与异步(非阻塞)函数如何实现并发执行。通过分析`asyncio`和`threading`的工作原理,我们阐明了直接在`async`函数中调用阻塞同步代码的局限性,并重点介绍了使用`asyncio.loop.run_in_executor`将同步任务调度到单独线程池中执行的策略,从而确保`asyncio`事件循环的非阻塞性,实现高效的混合并发编程。

在Python的并发编程中,asyncio和threading是两种核心机制,分别适用于不同的场景。asyncio通过事件循环实现协作式多任务,非常适合I/O密集型且非阻塞的操作。而threading则利用操作系统线程实现并发,能够处理阻塞的I/O操作或CPU密集型任务(受限于GIL)。当我们需要在同一个应用程序中同时运行同步(阻塞)函数和异步(非阻塞)函数并希望它们并行执行时,直接将同步阻塞调用放在async函数中并不能实现真正的并发,反而会阻塞整个事件循环。

理解同步与异步的并发限制

考虑以下初始代码示例:

import asyncio
import threading
import time

def do_sync(number: int) -> int:
    # 模拟一个耗时的同步阻塞操作
    print(f"同步函数 do_sync({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    time.sleep(2) # 模拟2秒的阻塞
    result = number + 10
    print(f"同步函数 do_sync({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result

async def do_async(number: int) -> int:
    # 模拟一个耗时的异步非阻塞操作
    print(f"异步函数 do_async({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    await asyncio.sleep(1) # 模拟1秒的非阻塞等待
    result = number + 10
    print(f"异步函数 do_async({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result

async def main():
    start_time = time.time()
    # 这里的do_sync(1)是一个普通的函数调用,它会阻塞main函数的执行
    # 直到do_sync(1)完全执行完毕,await do_async(2)才会被调度
    sync_result = do_sync(1)
    async_result = await do_async(2)

    final_max = max(sync_result, async_result)
    print(f"最终结果 max({sync_result}, {async_result}): {final_max}")
    print(f"总耗时: {time.time() - start_time:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main())

运行上述代码,你会发现总耗时接近 2秒 + 1秒 = 3秒。这是因为do_sync(1)是一个普通的函数调用,它会在主线程上立即执行并阻塞,直到其完成,await do_async(2)才有机会被asyncio事件循环调度。这并非真正的并行或并发执行。

解决方案:使用 asyncio.loop.run_in_executor

要实现同步函数与异步函数的并行执行,我们需要将阻塞的同步函数从asyncio事件循环的主线程中剥离出来,放到一个单独的执行器(Executor)中运行。asyncio提供了loop.run_in_executor()方法来优雅地解决这个问题。

run_in_executor()方法允许我们将一个可调用对象(通常是同步函数)提交给一个concurrent.futures.ThreadPoolExecutor(默认)或concurrent.futures.ProcessPoolExecutor来执行。这样,阻塞的同步代码将在一个单独的线程(或进程)中运行,而asyncio事件循环则可以继续处理其他异步任务,从而实现并发。

工作原理

  1. 获取事件循环:首先,我们需要获取当前正在运行的asyncio事件循环实例。
  2. 提交任务:使用loop.run_in_executor(executor, func, *args)提交同步函数。
    • executor:可以是一个ThreadPoolExecutor实例,ProcessPoolExecutor实例,或者None(默认使用asyncio内部维护的ThreadPoolExecutor)。
    • func:要执行的同步函数。
    • *args:传递给func的参数。
  3. 返回Future:run_in_executor会立即返回一个concurrent.futures.Future对象,这是一个可等待(awaitable)的对象。
  4. 异步等待:我们可以在async函数中使用await关键字等待这个Future对象的结果,而不会阻塞事件循环。

示例代码:实现同步与异步并发

import asyncio
import threading
import time
import concurrent.futures # 导入concurrent.futures模块

def do_sync_blocking(number: int) -> int:
    print(f"同步函数 do_sync_blocking({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    time.sleep(2) # 模拟2秒的阻塞I/O或CPU工作
    result = number + 10
    print(f"同步函数 do_sync_blocking({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result

async def do_async_non_blocking(number: int) -> int:
    print(f"异步函数 do_async_non_blocking({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    await asyncio.sleep(1) # 模拟1秒的非阻塞I/O等待
    result = number + 10
    print(f"异步函数 do_async_non_blocking({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result

async def main_parallel():
    start_time = time.time()
    loop = asyncio.get_running_loop()

    # 将同步阻塞函数提交到默认的ThreadPoolExecutor中执行
    # 这会立即返回一个Future对象,而不会阻塞当前事件循环
    sync_task_future = loop.run_in_executor(None, do_sync_blocking, 1)

    # 异步函数可以直接调度到事件循环中
    async_task_coro = do_async_non_blocking(2)

    # 使用asyncio.gather并发等待这两个任务的结果
    # gather会等待所有传入的awaitable对象完成
    results = await asyncio.gather(sync_task_future, async_task_coro)

    sync_result, async_result = results
    final_max = max(sync_result, async_result)

    print(f"最终结果 max({sync_result}, {async_result}): {final_max}")
    print(f"总耗时: {time.time() - start_time:.2f} 秒")

if __name__ == "__main__":
    asyncio.run(main_parallel())

运行这段代码,你会观察到总耗时大约是 2秒。这是因为do_sync_blocking在后台线程中执行,而do_async_non_blocking在事件循环中执行,两者几乎同时开始,并且程序等待两者中最长的那个任务完成(即2秒的同步任务)。这证明了我们成功地实现了同步与异步任务的并发执行。

进阶考量:线程安全与call_soon_threadsafe

线程安全

当我们将同步函数放到单独的线程中执行时,需要特别注意线程安全问题。如果同步函数访问或修改共享资源(例如全局变量、数据库连接池等),则必须使用适当的同步机制,如threading.Lock、threading.Semaphore或queue.Queue来避免竞态条件。

Pinokio Pinokio

Pinokio是一款开源的AI浏览器,可以安装运行各种AI模型和应用

Pinokio 232 查看详情 Pinokio

例如,如果do_sync_blocking内部需要保护某个共享资源,其结构应如下:

import threading

shared_data = []
data_lock = threading.Lock()

def do_sync_with_lock(number: int) -> int:
    with data_lock: # 使用with语句确保锁的正确获取和释放
        shared_data.append(f"Processing {number}")
        # ... 其他操作 ...
    return number + 10

asyncio.loop.call_soon_threadsafe

除了run_in_executor,asyncio还提供了loop.call_soon_threadsafe()方法。这个方法的主要用途是允许在非事件循环线程中安全地调度一个回调函数到事件循环中执行。它返回一个asyncio.Handle对象,而不是一个Future。

例如,如果你有一个工作线程处理完数据后,需要通知事件循环更新UI或执行某个异步操作,你可以使用call_soon_threadsafe:

# 在一个非asyncio线程中
# loop.call_soon_threadsafe(my_callback, arg1, arg2)

然而,对于将阻塞的同步函数从事件循环中剥离以实现并发,run_in_executor是更直接和推荐的方法,因为它直接处理了任务的执行和结果的返回。call_soon_threadsafe更多是用于跨线程的事件通知和回调调度。

总结与最佳实践

  1. 明确并发模型
    • 对于I/O密集型且非阻塞的任务,优先使用asyncio。
    • 对于阻塞I/O操作或CPU密集型任务,考虑使用threading或multiprocessing。
  2. 混合使用策略:当需要在asyncio应用中执行阻塞的同步代码时,始终使用asyncio.loop.run_in_executor将其 offload 到一个线程池(或进程池)中。这能保持事件循环的响应性,防止整个应用程序被阻塞。
  3. 选择执行器
    • ThreadPoolExecutor (默认):适用于I/O密集型阻塞任务,因为Python的GIL允许线程在等待I/O时切换。
    • ProcessPoolExecutor:适用于CPU密集型任务,因为它会启动新的进程,每个进程有自己的Python解释器和GIL,从而实现真正的CPU并行。
  4. 线程安全:在多线程环境中,共享资源的访问必须通过锁或其他同步原语进行保护,以防止数据损坏和竞态条件。
  5. 避免在事件循环中直接调用阻塞代码:这是导致asyncio应用性能低下的常见原因。务必将所有阻塞调用通过run_in_executor进行封装。

通过理解和正确应用asyncio与threading的结合策略,开发者可以构建出既能高效处理大量并发I/O,又能平稳集成现有阻塞代码的强大Python应用程序。

以上就是Python中同步与异步函数的并发执行:结合asyncio与threading的详细内容,更多请关注其它相关文章!


# 应用程序  # 网站的推广措施包括哪些  # 甘肃品牌推广网站有哪些  # 常州网站建设关键词优化  # 衡水网站建设详细策划  # 风水运势网站如何推广  # 漫画网站推广策划书范文  # 衡阳网站优化简历软件  # 关键词排名优化大师怎么用啊  # 女性搜索男性关键词排名  # 网站建设 java  # 重写  # 全局变量  # 自定义  # python  # 如何实现  # 多线程  # 适用于  # 回调  # 是一个  # red  # 同步机制  # 异步任务  # 并发编程  # ai  # 回调函数  # app  # 操作系统 


相关栏目: 【 科技资讯46185 】 【 网络学院92790


相关推荐: 铁路12306卧铺选择攻略 铁路12306下铺座位预定技巧  优化MinIO list_objects_v2 操作的性能瓶颈与最佳实践  ExcelARRAYTOTEXT函数怎么自定义分隔符输出数组文本_ARRAYTOTEXT实现动态生成SQL语句  Win11怎么设置开机NumLock亮 Win11修改注册表InitialKeyboardIndicators值  星露谷物语官网入口 星露谷物语游戏官网入口  包子漫画官方网站在线链接-包子漫画在线阅读平台主页地址  圆通快递查询实时追踪 圆通物流包裹状态快速查看  Win11怎么开启省电模式_Win11电池节电模式自动开启  Composer的 "check-platform-reqs" 命令有什么用_在部署前检查生产环境是否满足Composer依赖需求  React Hooks最佳实践:动态组件状态管理的组件化方案  qq游戏免费畅玩入口_qq游戏电脑版快速启动  Excel组合图表怎么做 Excel创建柱状图与折线组合图教程【图表】  Go语言中JSON数据解析与字段访问教程  邮政编码查询不到怎么办_邮政编码查询不到的常见原因与对策  红果短剧网页版官网入口 官方最新网址发布  提升屏幕阅读器对“m”时间单位的播报准确性:HTML与CSS组合解决方案  机构:以往存储涨价周期小米利润率实际上有所改善 能转嫁给消费者等  抖音隐秘迷城小游戏入口_ 抖音冒险解谜小游戏秒玩  C++如何操作注册表_Windows平台下C++读写注册表的API函数详解  菜鸟取件码是什么怎么查 最全查询渠道汇总  qq音乐在线播放入口_qq音乐电脑版登录链接  HTML转PPT成品工具有哪些?HTML网页转PPT成品工具大全  12306几点到几点不能订票? | 官方最新系统维护时间全解析  163邮箱登录密码 163邮箱忘记密码找回  迅雷下载到U盘速度很慢怎么办_迅雷U盘下载慢优化方法  深入理解J*a链表中的IPosition接口与使用  Tabulator表格日期时间排序问题及自定义解决方案  J*a里如何使用forEach遍历Map_Map遍历方法说明  Go RPC HTTP服务正确实现与常见陷阱解析  顺丰快件物流信息 官方网站查询入口  在J*a中如何开发简易仓库管理与库存统计_仓库管理库存统计项目实战解析  J*a递归快速排序中静态变量导致数据累积的陷阱与解决方案  高德地图沿途添加点失败如何解决 高德多点规划方法  如何在复杂的电商平台中优雅地管理共享资源并确保正确重定向,使用spryker-shop/resource-share-page模块助你一臂之力  J*aScript数据结构转换:将对象数组按类别分组  Win10怎么制作U盘启动盘 Win10系统安装U盘制作教程【详解】  修复二维数组索引越界异常:一维循环到二维坐标的正确映射  J*aScript打印功能_j*ascript输出控制  铃兰之剑为这和平的世界希里技能组及加点推荐  J*aScript中针对特定容器内图片动画的实现教程  Win11 USB传输速度慢怎么解决 Win11 USB驱动更新与设置  AO3同人作品网入口 AO3搜索引擎官网永久地址  单12V-2×6实现为RTX 5090供电750W!甚至都没敢跑分  微信网页版扫码登录入口 微信网页版二维码登录入口  不会效仿卡普空!《铁拳》制作人澄清:不采取赛事付费|直播|  Win10如何清理注册表垃圾 Win10手动清理无效注册表【技巧】  Lar*el用户头像管理:实现图片缩放、存储与旧文件安全删除的最佳实践  解决Tabulator日期时间排序问题的专业指南  Basecamp怎样用留言钉固定重点_Basecamp用留言钉固定重点【重点标记】  VS Code远程开发时如何处理文件权限问题 

搜索