新闻中心
优化asyncio中嵌套异步任务的并发调度

本文探讨了在`asyncio`中处理嵌套异步生成器时,如何通过传统`await`模式导致的串行执行问题。针对`await`的阻塞特性,文章提出并详细阐述了利用`asyncio.queue`和`asyncio.event`构建生产者-消费者模式的解决方案,从而实现任务间的解耦和真正的并发执行,显著提升异步应用的效率和响应性。
理解asyncio中的await与并发限制
在asyncio编程中,await关键字是调度协程的核心机制。当一个协程遇到await表达式时,它会暂停自身的执行,将控制权交还给事件循环,并等待被await的协程完成。一旦被await的协程完成并返回结果,原协程才会从暂停点继续执行。这种机制虽然实现了协作式多任务,但如果设计不当,也可能导致非预期的串行执行。
考虑以下场景:一个异步任务(main)需要从一个异步生成器(sentences_generator)获取数据,然后将数据传递给另一个异步任务(process_sentence)进行处理。如果main函数在每次获取到数据后,都直接await process_sentence的完成,那么在process_sentence执行期间,sentences_generator将无法继续生成新的数据。这违背了我们期望的并发处理,即当process_sentence在处理当前数据时,sentences_generator应该能够同时准备下一批数据。
以下是原始代码示例及其输出,展示了这种串行阻塞行为:
import asyncio
async def stream():
char_string = "Hi. Hello. Hello."
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator():
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence: ", sentence)
await asyncio.sleep(len(sentence)*0.1) # 模拟耗时处理
print("sentence processed!")
async def main():
i=0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence) # 这里的await导致阻塞
i += 1
# asyncio.run(main())原始输出示例:
got char: H got char: i got char: . got sentence: Hi. processing sentence: 0 waiting for processing sentence: Hi. sentence processed! got char: got char: H got char: e got char: y got char: . got sentence: Hey. processing sentence: 1 waiting for processing sentence: Hey. sentence processed! ...
从输出可以看出,只有当process_sentence完全处理完一个句子后,stream和sentences_generator才能继续生成下一个字符和句子。这并不是我们期望的并发效果。
解决方案:使用asyncio.Queue实现生产者-消费者模式
为了实现真正的并发,我们需要解耦数据的生产和消费过程,使它们能够独立运行。asyncio.Queue是实现这种生产者-消费者模式的理想工具。
核心思想:
Motiff妙多
Motiff妙多是一款AI驱动的界面设计工具,定位为“AI时代设计工具”
334
查看详情
- 生产者(Producer):一个或多个异步任务负责生成数据,并将数据放入asyncio.Queue中。
- 消费者(Consumer):一个或多个异步任务从asyncio.Queue中取出数据进行处理。
- 独立运行:生产者和消费者作为独立的协程,由asyncio事件循环调度,它们之间通过队列进行通信,互不阻塞。
此外,为了实现优雅的关闭和通知消费者数据已全部生产完毕,我们可以引入asyncio.Event。生产者在完成所有数据生产后设置Event,消费者则可以结合队列是否为空和Event状态来判断何时停止。
优化后的代码实现
我们将修改sentences_generator作为生产者,将生成的句子放入队列;process_sentence作为消费者,从队列中取出句子进行处理。main函数将负责启动这两个独立的协程。
import asyncio
# 定义全局变量用于计数,方便观察
i = 1
async def stream():
char_string = "Hi. Hello. Thank you." # 增加一些内容以更好地展示并发
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
"""
生产者协程:从字符流生成句子,并放入队列。
当所有句子生成完毕后,设置flag通知消费者。
"""
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
await q.put(sentence) # 将生成的句子放入队列
sentence = ""
# 确保最后一个不以标点符号结尾的句子也被处理(如果需要)
if sentence:
print("got sentence: ", sentence)
await q.put(sentence)
flag.set() # 生产完毕,设置事件标志
async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
"""
消费者协程:从队列中获取句子并进行处理。
当队列为空且生产者已设置flag时,停止消费。
"""
global i
while True:
# 检查是否应该停止:队列为空且生产者已完成
if q.empty() and flag.is_set():
break
# 尝试从队列获取项目,如果队列为空则等待
item = await q.get()
print("processing sentence: ", i)
print("waiting for processing sentence: ", item)
await asyncio.sleep(len(item) * 0.1) # 模拟耗时处理
print("sentence processed!")
q.task_done() # 通知队列此任务已完成
i += 1
async def main():
global i
i = 1 # 重置计数器
event = asyncio.Event() # 用于生产者通知消费者结束
queue = asyncio.Queue[str]() # 生产者和消费者之间的通信队列
# 启动生产者和消费者作为独立的协程任务
producer_task = asyncio.create_task(sentences_generator(queue, event))
consumer_task = asyncio.create_task(process_sentence(queue, event))
# 等待所有任务完成
await asyncio.gather(producer_task, consumer_task)
# 可选:等待队列中所有任务被标记为完成,确保所有数据都被处理
await queue.join()
asyncio.run(main())预期输出示例:
got char: H got char: i got char: . got sentence: Hi. got char: got char: H processing sentence: 1 waiting for processing sentence: Hi. got char: e got char: l got char: l got char: o got char: . got sentence: Hello. sentence processed! got char: got char: T processing sentence: 2 waiting for processing sentence: Hello. got char: h got char: a got char: n got char: k got char: got char: y got char: o got char: u got char: . got sentence: Thank you. sentence processed! processing sentence: 3 waiting for processing sentence: Thank you. sentence processed!
从这个输出可以看出,当process_sentence正在处理第一个句子时,stream和sentences_generator已经继续生成了后续的字符和句子,并将其放入队列。这正是我们期望的并发行为。
关键点和注意事项
- asyncio.Queue的作用:它提供了一个线程安全的(在asyncio中是协程安全的)FIFO队列。put()操作在队列满时会暂停,get()操作在队列空时会暂停,直到有新的数据可用。
- asyncio.Event的作用:它是一个简单的同步原语,用于一个协程通知另一个协程某个事件已经发生。生产者在完成所有数据生产后调用flag.set(),消费者则通过flag.is_set()来检查生产者的状态。
- asyncio.gather():用于并发运行多个协程或任务,并等待它们全部完成。
-
q.task_done() 和 q.join():
- q.task_done():消费者在完成对从队列中获取的项目的处理后调用,通知队列该项目已处理完毕。
- q.join():main函数可以调用await queue.join()来等待队列中所有项目都被get并task_done。这确保了在程序退出前所有数据都已得到处理。在我们的示例中,虽然gather已经等待了所有协程,但queue.join()提供了一个更明确的机制来等待所有队列中的工作完成。
- 消费者退出条件:消费者协程的退出逻辑至关重要。一个常见的模式是while True循环,内部判断q.empty() and flag.is_set()来决定是否退出。这确保了在生产者完成且队列中所有待处理项都已消费后,消费者才能安全退出。
- 错误处理:在实际应用中,生产者和消费者内部应添加适当的错误处理机制,例如try-except块。
- 背压(Backpressure):asyncio.Queue可以有容量限制。如果生产者生产速度远快于消费者,队列会逐渐填满,最终q.put()会暂停,从而对生产者施加背压,防止内存无限增长。
总结
通过将异步任务分解为独立的生产者和消费者,并利用asyncio.Queue进行通信,我们成功地将原本串行执行的逻辑转换为了并发执行。这种模式不仅提高了资源利用率,也使得代码结构更加清晰,易于维护和扩展。在设计复杂的asyncio应用时,当存在数据流动的依赖但又希望实现任务并行时,生产者-消费者模式与asyncio.Queue是解决这类问题的强大工具。
以上就是优化asyncio中嵌套异步任务的并发调度的详细内容,更多请关注其它相关文章!
# 工具
# ai
# stream
# 异步任务
# 多个
# 为空
# 可以看出
# go
# 推广与营销哪个前景好
# 黄站长网站推广
# 培训网站建设哪家强一点
# 公众号微信营销前期推广
# 做网络推广的网站叫什么
# 郑州市场营销推广公司
# 营销号推广费是多少啊怎么算
# 广东短视频搜索seo优化
# SEO写作业文案
# seo和打工哪个赚钱
# 我们可以
# 它是
# 才会
# 第一个
# 这确
# 子时
# 都已
相关栏目:
【
科技资讯46185 】
【
网络学院92790 】
相关推荐:
哔哩哔哩忘记密码了怎么找回_哔哩哔哩密码找回方法
拼多多赚钱渠道_拼多多收益来源
免费抖音短视频入口_抖音网页版短视频免费通道
UC浏览器如何安装插件 UC浏览器添加扩展程序详细教程【进阶】
J*aScript动态修改指定div内所有a标签样式指南
cad如何更改注释性对象的比例_cad注释性比例调整方法
Go调试环境为何无法启动_Go调试器启动失败原因与解决策略
Lar*el头像管理:图片缩放与旧文件删除的最佳实践
整合Supabase认证与Django模型:跨模式迁移的解决方案
解决 Express.js 中 PUT 请求密码修改失败的路由配置指南
夸克浏览器网页版最新地址 夸克浏览器官方入口合集
QQ邮箱在线登录平台 QQ邮箱个人邮箱网页版入口
拷贝漫画电脑版官网入口 拷贝漫画(PC版)在线直达
Excel文件在线转换快速入口 Excel在线格式转换网站
如何在CSS中使用浮动制作导航栏_float实现水平菜单
手机屏幕碎了但能正常使用怎么办 手机外屏碎裂的修复建议
React中useState与局部变量:理解组件状态管理与渲染机制
QQ邮箱稳定登录入口_QQ邮箱官方网站网页版使用
没有大陆身份证/银行卡如何实名微信? 亲测有效的几种方法分享
漫蛙2网页版漫画入口 漫蛙漫画在线官方登录
c++如何使用TBB库进行任务并行_c++ Intel线程构建模块
字由网在线版登录地址 字由网网页版安全入口
为什么我的微信朋友圈看不到别人的更新_微信朋友圈更新显示异常解决方法
从J*aScript对象中精确提取指定属性的教程
在J*aScript中复现SciPy的B样条拟合与求值:关键考量
双系统安装时,如何设置默认启动系统? msconfig命令了解一下!
CKEditor 5 自定义构建在React应用中渲染失败的调试与解决
C++指针和引用有什么区别_C++内存管理核心概念深度解析
sublime如何只显示或隐藏特定类型文件_sublime侧边栏文件过滤
Node.js中HTML按钮与J*aScript函数交互的正确姿势
深入理解rpy2中的类型转换:优化Python对象到R矩阵的映射
Gmail邮箱申请注册直达_Gmail邮箱免费注册PC版官网入口2025
微信商城在哪里打开【步骤】
yy漫画网页版官方入口_yy漫画官网登录页面链接
C#如何安全地从用户上传的XML文件中读取数据? 验证与清理策略
三星ZFold5多任务卡顿_Samsung ZFold5流畅度提升
FullCalendar 自定义按钮样式定制指南
VS Code远程开发时如何处理文件权限问题
响应式CSS Grid布局:优化网格项在小屏幕下的堆叠与宽度适配
vivo手机互传视频怎么操作_vivo手机互传视频详细传输方法
高德地图怎么看全景照片_高德地图全景照片浏览教程
Tailwind CSS line-clamp 布局问题解析与修复指南
在J*a中如何开发简易电子商务商品管理系统_商品管理系统项目实战解析
Go与Ruby之间实现AES加密互通:CFB模式下的密钥长度匹配策略
mcjs网页版流畅运行 mcjs低配电脑畅玩入口
Golang如何使用context实现超时取消_Golang context超时取消模式实践
快手网页版在线登录 快手网页版官网入口快速访问
sublime怎么预览Markdown渲染效果_Markdown Preview插件 for sublime教程
Win10如何开启蓝牙功能_Windows10找不到蓝牙开关解决方法
b站如何看历史记录_b站观看历史找回方法


2025-11-27
浏览次数:次
返回列表
rocessing sentence: 1
waiting for processing sentence: Hey.
sentence processed!
...