新闻中心
FastAPI集成与监控外部进程:基于asyncio的非阻塞实现

本教程详细介绍了如何在FastAPI应用中异步启动并监控外部服务(如J*a服务)的生命周期。文章从解决subprocess阻塞问题入手,逐步讲解了如何利用asyncio.SubprocessProtocol捕获日志,并通过asyncio.Future和FastAPI的lifespan上下文管理器实现非阻塞的启动等待与优雅关闭,确保外部服务完全就绪后FastAPI才开始提供服务,并能在关闭时妥善处理外部进程。
引言:FastAPI与外部服务集成挑战
在现代微服务架构中,一个应用(如基于FastAPI的Python服务)经常需要与其他独立服务(如J*a后端、数据库或其他辅助进程)进行交互。在这种场景下,如何在FastAPI应用启动时同步启动这些外部服务,并在其完全就绪后才暴露API接口,以及在FastAPI关闭时优雅地终止这些外部服务,是一个常见的挑战。
传统的subprocess模块在同步模式下会阻塞主进程,这在异步框架FastAPI中是不可接受的。即使使用asyncio.subprocess_shell或asyncio.subprocess_exec,也需要一种机制来非阻塞地监控外部服务的启动状态,以避免FastAPI在外部依赖尚未准备好时就开始处理请求。本教程将深入探讨如何利用asyncio.SubprocessProtocol和FastAPI的lifespan特性,实现一个健壮、非阻塞的外部服务集成与监控方案。
初步尝试与阻塞陷阱
最初的尝试可能涉及使用asyncio.subprocess_shell来启动外部进程,并通过自定义的asyncio.SubprocessProtocol来捕获其输出日志,从而判断服务是否启动成功。然而,一个常见的陷阱是,在等待外部服务启动完成时,使用一个简单的while循环进行忙等待:
import asyncio
import re
from logging import getLogger
from fastapi import FastAPI
logger = getLogger(__name__)
app = FastAPI()
# 定义一个SubprocessProtocol来处理子进程的I/O
class MyProtocol(asyncio.SubprocessProtocol):
startup_str = re.compile("Server - Started") # 假设J*a服务启动成功会输出此字符串
is_startup = False # 标志位,指示服务是否启动
def pipe_data_received(self, fd: int, data: bytes):
log_line = data.decode().strip()
logger.info(f"Subprocess Log (FD {fd}): {log_line}")
# 如果服务尚未标记为启动,则检查日志
if not self.is_startup:
if re.search(self.startup_str, log_line):
self.is_startup = True
logger.info("J*a service startup signal detected!")
def process_exited(self):
logger.info("External process exited.")
# 这里可能需要添加更多逻辑来处理进程异常退出
super().process_exited()
# 全局变量用于存储transport和protocol实例
transport: asyncio.SubprocessTransport | None = None
protocol: MyProtocol | None = None
@app.on_event("startup")
async def startup_event():
global transport, protocol
loop = asyncio.get_running_loop()
# 启动J*a服务脚本
transport, protocol = await loop.subprocess_shell(MyProtocol, "/start_j*a_server.sh")
logger.info(f"Subprocess started with PID: {transport.get_pid()}")
# 错误示例:此处的while循环会阻塞事件循环
# while not protocol.is_startup:
# pass
# logge
r.info("J*a service started successfully!")
@app.on_event("shutdown")
async def shutdown_event():
global transport
if transport:
logger.info("FastAPI shutting down. Closing subprocess transport.")
transport.close()问题分析: 上述代码中被注释掉的while not protocol.is_startup: pass语句是一个典型的阻塞操作。在asyncio事件循环中,如果一个协程执行了一个不带await的无限循环,它将永远不会将控制权交还给事件循环。这意味着pipe_data_received方法(负责更新is_startup标志)将永远没有机会被执行,导致is_startup始终为False,进程会冻结。
解决方案一:引入非阻塞等待
解决上述阻塞问题的最直接方法是在while循环中引入一个await asyncio.sleep(0.1)。asyncio.sleep是一个协程,它会暂停当前协程的执行,并将控制权交还给事件循环,允许其他协程(包括asyncio.SubprocessProtocol内部处理I/O的协程)运行。
import asyncio
import re
from logging import getLogger
from fastapi import FastAPI
logger = getLogger(__name__)
app = FastAPI()
class MyProtocol(asyncio.SubprocessProtocol):
startup_str = re.compile("Server - Started")
is_startup = False
def pipe_data_received(self, fd: int, data: bytes):
log_line = data.decode().strip()
logger.info(f"Subprocess Log (FD {fd}): {log_line}")
if not self.is_startup:
if re.search(self.startup_str, log_line):
self.is_startup = True
logger.info("J*a service startup signal detected!")
def process_exited(self):
logger.info("External process exited.")
super().process_exited()
transport: asyncio.SubprocessTransport | None = None
protocol: MyProtocol | None = None
@app.on_event("startup")
async def startup_event():
global transport, protocol
loop = asyncio.get_running_loop()
transport, protocol = await loop.subprocess_shell(MyProtocol, "/start_j*a_server.sh")
logger.info(f"Subprocess started with PID: {transport.get_pid()}")
# 正确做法:引入非阻塞等待
while not protocol.is_startup:
logger.debug("Waiting for J*a service to start...")
await asyncio.sleep(0.1) # 释放控制权,允许其他协程(包括pipe_data_received)运行
logger.info("J*a service started successfully!")
@app.on_event("shutdown")
async def shutdown_event():
global transport
if transport:
logger.info("FastAPI shutting down. Closing subprocess transport.")
transport.close()这个改进版本解决了阻塞问题,FastAPI现在能够等待外部服务启动。然而,app.on_event机制在FastAPI 0.95+版本中已被lifespan上下文管理器取代,并且使用简单的布尔标志进行状态管理在复杂场景下可能不够灵活。
Pippit AI
CapCut推出的AI创意内容生成工具
133
查看详情
解决方案二:使用FastAPI lifespan 和 asyncio.Future (推荐)
为了更健壮、更符合FastAPI最佳实践地管理外部服务的生命周期,我们推荐使用FastAPI的lifespan上下文管理器结合asyncio.Future。
FastAPI lifespan:lifespan是一个异步上下文管理器,它允许您定义在FastAPI应用启动前、应用运行中和应用关闭后的逻辑。这为管理外部资源(如数据库连接、缓存、或外部进程)提供了清晰且强大的入口。
asyncio.Future的优势:asyncio.Future是一个强大的异步结果占位符。它允许一个协程(例如lifespan函数中的等待逻辑)等待另一个协程(例如MyProtocol中的pipe_data_received方法)设置一个结果。相比于简单的布尔标志,Future提供了更丰富的异步事件通知和结果传递机制,并且可以方便地与asyncio.wait_for结合使用,实现带超时的等待。
以下是使用lifespan和asyncio.Future的完整实现:
import asyncio from contextlib import asynccontextmanager import re from logging import getLogger, INFO, StreamHandler, Formatter from fastapi import FastAPI
以上就是FastAPI集成与监控外部进程:基于asyncio的非阻塞实现的详细内容,更多请关注其它相关文章!
# 如何实现
# 静海区网站网络推广公司
# 小红薯营销推广方案设计
# 浏览器关键词搜索排名
# 相关文章 seo)
# 茶陵全网营销推广
# 太原网站建设设计公司
# 公司网站建设费多少
# 广州营销seo代运营
# 成都网站收录优化
# 厦门网站优化公司
# 如何在
# 是在
# python
# 解决方法
# 重写
# 布尔
# 自定义
# 管理器
# 是一个
# stream
# ai
# 后端
# app
# java
相关栏目:
【
科技资讯46185 】
【
网络学院92790 】
相关推荐:
J*a应用集成GitHub CLI与API认证指南
4399免费游戏网址入口 4399小游戏免费入口点开即玩
Django表单验证失败时保留用户输入数据的最佳实践
Win11怎么设置开机NumLock亮 Win11修改注册表InitialKeyboardIndicators值
css元素hover动画延迟生效怎么办_使用animation-delay调整触发时间
PHP中SSG-WSG API的AES加密实践:正确使用初始化向量
win11专注助手在哪 Win11免打扰模式设置与自动化规则【指南】
MAC怎么在地图App里使用“四处看看”_MAC体验部分城市的3D实景街景
如何解决电商平台定制报价请求的“黑洞”问题,SprykerQuoteRequest模块助你提升客户体验与销售效率
css子元素高度不一致导致布局错位怎么办_使用align-items:stretch解决高度差异
UC浏览器如何安装插件 UC浏览器添加扩展程序详细教程【进阶】
AO3官网镜像链接 Archive of Our Own同人文在线浏览
怎样在Excel中做仪表盘_Excel仪表盘设计与关键指标展示方法
电脑IP地址怎么查 查看本机IP地址的几种方法
php源码怎么在电脑上测试_电脑测试php源码方法步骤【教程】
Safari浏览器输入栏卡顿如何解决 Safari搜索建议与缓存清理
C++如何进行游戏物理模拟_使用Box2D库为C++游戏添加2D物理效果
CSS Grid如何控制元素对齐_align-items与justify-items组合使用
Win10如何清理注册表垃圾 Win10注册表维护与优化指南【慎用】
Fabric Mod开发:在1.19.3+版本中正确添加自定义物品并管理物品组
谷歌学术网站直达地址 谷歌学术搜索网页版一键进入
如何使用J*aScript精确选择并批量修改特定父元素下子链接的样式
Excel函数批量查找替换超快方法_Excel用REPLACE和FIND函数秒级替换
BetterDiscord插件中安全更新用户简介的实践指南
vivo浏览器自带的下载器速度慢怎么办 vivo浏览器提升文件下载速度的技巧
大麦的“候补”是什么意思 大麦候补购票规则【详解】
火狐浏览器占用内存高卡顿怎么办 火狐浏览器性能优化设置技巧
Lar*el如何生成PDF或Excel文件_Lar*el文档导出工具与使用教程
在VS Code中配置和运行Dart程序的完整步骤
c++中的std::launder有什么实际用途_c++对象生命周期与指针优化
Win11如何开启讲述人功能 Win11屏幕阅读器(讲述人)开启与关闭【教程】
Win11网速慢怎么解决 Win11网络设置优化解除限速
理解J*aScript Promise的微任务队列与执行顺序
Win11文件资源管理器卡顿怎么修 Win11重置资源管理器进程优化响应速度【修复方法】
利用5118提升短视频内容效果_5118短视频关键词优化方法
一加Ace 6T实拍样张首次公布!李杰:主摄实力完全看齐4K档性能旗舰
大象笔记网页版入口 印象笔记网页版登录入口
C++如何操作大型数据集_使用C++流式处理(Streaming)技术避免一次性加载大文件
如何使用 Excel 发布器与 Power BI 分享 Excel 洞察
如何高效处理PHP中的Excel数据导入导出?PortPHP/Spreadsheet助你轻松搞定!
菜鸟取件码是什么怎么查 最全查询渠道汇总
字由网在线版登录地址 字由网网页版安全入口
微信群消息显示延迟如何解决 微信群消息刷新优化方法
Composer中的^和~符号代表什么_精通Composer版本号语义化约束
ACG动漫手机版官网入口 手机ACG动漫APP在线观看正版
PHP URL参数传递与500错误调试指南
age动漫网站入口 age动漫官网直接访问入口
Lar*el如何正确地在控制器和模型之间分配逻辑_Lar*el代码职责分离与架构建议
Composer的 "conflict" 字段有什么用_如何声明不兼容的包以避免依赖冲突
《北京人工智能产业白皮书(2025)》发布:全年核心产值预计突破 4500 亿元


2025-11-04
浏览次数:次
返回列表
r.info("J*a service started successfully!")
@app.on_event("shutdown")
async def shutdown_event():
global transport
if transport:
logger.info("FastAPI shutting down. Closing subprocess transport.")
transport.close()