新闻中心

Kedro与Streamlit集成:动态数据目录下的管道运行实践

2025-11-14
浏览次数:
返回列表

Kedro与Streamlit集成:动态数据目录下的管道运行实践

本文旨在指导开发者如何在streamlit应用中集成并运行kedro数据管道,重点解决如何动态创建并传递自定义`datacatalog`以处理streamlit加载的数据。文章将阐明常见的错误尝试及其原因,并提供一种健壮的方法,通过`kedrosession.run()`的`data_catalog`参数正确地将运行时数据注入kedro管道,从而实现数据处理的无缝衔接。

在构建交互式数据应用时,将强大的数据管道框架(如Kedro)与灵活的Web应用框架(如Streamlit)结合是一种常见的需求。特别是当数据源是动态的,例如用户通过Streamlit界面上传文件时,我们需要一种机制来将这些运行时加载的数据作为输入,传递给Kedro管道进行处理。本文将详细介绍如何实现这一目标,并纠正在此过程中可能遇到的常见误区。

理解Kedro与Streamlit集成中的数据流

Kedro的核心概念之一是DataCatalog,它定义了数据加载和保存的方式。通常,DataCatalog在conf/base/catalog.yml中静态定义。然而,在Streamlit应用中,数据通常在运行时由用户上传,这意味着我们需要一个动态的DataCatalog来封装这些内存中的DataFrame。

目标是将Streamlit中加载的DataFrame包装成MemoryDataSet,然后构建一个临时的DataCatalog,并将其传递给Kedro管道执行。

常见错误尝试与原因分析

在尝试将自定义DataCatalog传递给Kedro管道时,开发者可能会遇到一些AttributeError。理解这些错误的原因对于正确实现集成至关重要。

错误尝试一:直接修改KedroContext或KedroSession的catalog属性

# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.io import DataCatalog, MemoryDataSet
import pandas as pd
import streamlit as st

# 假设 df1, df2, ... 是在Streamlit中加载的DataFrame
df1 = pd.DataFrame({'col1': [1, 2]})
df2 = pd.DataFrame({'col2': [3, 4]})

if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path="./my_kedro_project") as session:
        context = session.load_context()
        # 尝试直接设置context.catalog,这将导致AttributeError
        # context.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'

        # 尝试直接设置session.catalog,同样会导致AttributeError
        # session.catalog = DataCatalog({"my_data": MemoryDataSet(df1)}) # AttributeError: can't set attribute 'catalog'

        # ...后续管道运行代码

原因分析:AttributeError: can't set attribute 'catalog'

KedroSession和KedroContext的catalog属性在Kedro内部被设计为只读。这意味着您不能在会话或上下文创建之后,通过直接赋值的方式来修改它们引用的DataCatalog对象。DataCatalog在KedroSession初始化时被加载并冻结,以确保管道执行的一致性和可预测性。尝试直接修改它会违反这一设计原则,从而引发AttributeError。

错误尝试二:通过KedroContext访问pipeline_registry

# 错误的代码示例
from kedro.framework.session import KedroSession
from kedro.runner import SequentialRunner
import streamlit as st

# ... (数据加载和catalog创建) ...

if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path="./my_kedro_project") as session:
        context = session.load_context()
        runner = SequentialRunner()
        # 尝试通过context.pipeline_registry获取管道,这将导致AttributeError
        # runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=custom_catalog) # AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

原因分析:AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

KedroContext对象并不直接拥有pipeline_registry属性。管道注册是KedroSession负责管理的一部分。当您通过KedroSession.run()方法执行管道时,会话会自动处理管道的查找和注册。直接从KedroContext中访问pipeline_registry是不符合Kedro设计模式的。

Perplexity Perplexity

Perplexity是一个ChatGPT和谷歌结合的超级工具,可以让你在浏览互联网时提出问题或获得即时摘要

Perplexity 302 查看详情 Perplexity

正确的方法:通过KedroSession.run()传递自定义DataCatalog

Kedro提供了一种简洁且推荐的方式来在运行时注入自定义DataCatalog,即使用KedroSession.run()方法的data_catalog(或旧版本中的catalog)参数。

步骤一:在Streamlit中加载数据并创建MemoryDataSet

首先,在Streamlit应用中,您需要使用文件上传器或其他方式加载数据,并将其转换为Pandas DataFrame。然后,将这些DataFrame包装成Kedro的MemoryDataSet对象。MemoryDataSet是Kedro提供的一种数据集类型,用于处理内存中的数据,非常适合这种动态场景。

import streamlit as st
import pandas as pd
from kedro.io import DataCatalog, MemoryDataSet
from kedro.framework.session import KedroSession
from pathlib import Path

# 假设你的Kedro项目路径
project_path = Path(__file__).parent / "my_kedro_project" # 根据实际项目结构调整

st.title("Kedro与Streamlit数据处理应用")

uploaded_file1 = st.file_uploader("上传 Reagentes CSV", type=["csv"])
uploaded_file2 = st.file_uploader("上传 Balanço de Massas CSV", type=["csv"])
# ... 可以根据需要添加更多文件上传器

df1, df2, df3, df4, df5, df6 = None, None, None, None, None, None

if uploaded_file1:
    df1 = pd.read_csv(uploaded_file1)
    st.write("Reagentes 数据加载成功:")
    st.dataframe(df1.head())

if uploaded_file2:
    df2 = pd.read_csv(uploaded_file2)
    st.write("Balanço de Massas 数据加载成功:")
    st.dataframe(df2.head())

# 假设还有其他文件加载,这里简化
# df3 = pd.DataFrame(...)
# df4 = pd.DataFrame(...)
# df5 = pd.DataFrame(...)
# df6 = pd.DataFrame(...)

步骤二:构建自定义DataCatalog

当所有必需的DataFrame都加载完毕后,您可以创建一个新的DataCatalog实例,并将这些MemoryDataSet对象作为键值对添加到其中。键名应与您的Kedro管道中期望的数据集名称相匹配。

# ... (承接上一步的代码) ...

if st.button('Processar Dados de Entrada'):
    if df1 is not None and df2 is not None: # 确保所有必要数据都已加载
        # 创建自定义DataCatalog
        custom_catalog = DataCatalog({
            "reagentes_raw": MemoryDataSet(df1),
            "balanco_de_massas_raw": MemoryDataSet(df2),
            # 根据需要添加更多数据集
            # "laboratorio_raw": MemoryDataSet(df3),
            # "laboratorio_raiox_raw": MemoryDataSet(df4),
            # "carta_controle_pims_raw": MemoryDataSet(df5),
            # "blend_raw": MemoryDataSet(df6)
        })

        st.info("正在执行Kedro管道...")

        try:
            # 步骤三:通过KedroSession.run()传递自定义DataCatalog
            with KedroSession.create(project_path=project_path) as session:
                session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")

            st.success('数据处理成功!')

            # 步骤四:从自定义DataCatalog中加载处理后的结果
            # 假设管道输出一个名为 "merged_raw_data_process" 的数据集
            if "merged_raw_data_process" in custom_catalog.list():
                merged_data = custom_catalog.load("merged_raw_data_process")
                st.header('结果数据预览')
                st.dataframe(merged_data.head())

                # 假设结果数据中有一个时间戳列
                if 'timestamp_column' in merged_data.columns: # 请替换为实际的时间戳列名
                    last_update = merged_data['timestamp_column'].max()
                    st.write(f"最新数据时间: {last_update.strftime('%d/%m/%Y %H:%M')}")
            else:
                st.warning("管道未生成预期的 'merged_raw_data_process' 数据集。")

        except Exception as e:
            st.error(f"Kedro管道执行失败: {e}")
    else:
        st.warning("请上传所有必要的数据文件。")

步骤三:通过KedroSession.run()传递自定义DataCatalog

这是解决问题的关键步骤。KedroSession.run()方法接受一个data_catalog(或旧版本中的catalog)参数,允许您传入一个临时的DataCatalog实例。这个传入的DataCatalog会与项目默认的DataCatalog合并,或者在某些情况下完全覆盖默认的同名数据集定义,从而将您的内存数据注入到管道执行中。

# ... (代码片段已包含在步骤二中) ...
with KedroSession.create(project_path=project_path) as session:
    session.run(data_catalog=custom_catalog, pipeline_name="tag_web_app")

请注意,pipeline_name参数用于指定要运行的特定管道。如果您的Kedro项目只有一个默认管道,可以省略此参数。

步骤四:从自定义DataCatalog中加载处理后的结果

管道执行完成后,如果您的管道配置为将结果保存到MemoryDataSet中(例如,通过在catalog.yml中将输出数据集定义为MemoryDataSet,或者在运行时通过custom_catalog覆盖),您可以直接从传入的custom_catalog中加载这些结果。

# ... (代码片段已包含在步骤二中) ...
merged_data = custom_catalog.load("merged_raw_data_process")

注意事项与最佳实践

  1. Kedro项目结构: 确保您的Streamlit应用能够正确找到Kedro项目的根目录(project_path)。通常,Streamlit应用可以放在Kedro项目之外,但需要正确指定project_path。
  2. MemoryDataSet的使用: MemoryDataSet非常适合处理临时数据。如果需要将处理结果持久化,您可以在DataCatalog中定义其他类型的数据集(如ParquetDataSet、CSVDataSet等),或者在管道的末尾手动保存DataFrame。
  3. 管道定义: 确保您的Kedro管道中的节点能够接收和处理MemoryDataSet提供的数据。输入数据集的名称必须与custom_catalog中定义的键名匹配。
  4. 错误处理: 在实际应用中,务必添加适当的错误处理机制,以捕获Kedro管道执行过程中可能出现的异常,并向用户提供友好的反馈。
  5. 性能考虑: 对于大型数据集,MemoryDataSet可能会消耗大量内存。根据您的数据规模和性能需求,可能需要考虑更高效的数据处理策略,例如分块处理或使用更适合大数据的Kedro数据集类型。
  6. Kedro版本兼容性: 本文中的data_catalog参数适用于较新版本的Kedro。如果您使用的是旧版本,可能需要使用catalog参数。请查阅您的Kedro版本文档以确认正确的参数名称。

总结

通过在KedroSession.run()方法中利用data_catalog参数,我们可以优雅地将Streamlit中加载的动态数据注入到Kedro管道中进行处理。这种方法避免了直接修改Kedro内部只读属性的错误,提供了一种符合Kedro设计哲学且易于维护的集成方案。遵循本文介绍的步骤和最佳实践,您将能够构建出功能强大、交互性强的数据处理应用。

以上就是Kedro与Streamlit集成:动态数据目录下的管道运行实践的详细内容,更多请关注其它相关文章!


# 上传  # 深圳php网站建设  # 办网站建设  # 重庆seo系统  # 麻城住建委建设信息网站  # 知道seo是什么吗  # 龙岗网站建设技术  # 抖音seo排名星云  # 营销推广活动策划书籍  # seo z  # 品牌网站推广服务电话  # 旧版本  # 道中  # 键值  # 大数据  # 如何做  # 您可以  # 数据处理  # 自定义  # 您的  # 加载  # blend  # 键值对  # stream  # ai  # csv  # session  # app 


相关栏目: 【 科技资讯46185 】 【 网络学院92790


相关推荐: Lar*el递归关系中排除子孙节点的策略  树莓派传感器触发:通过Twilio API发送WhatsApp消息教程  Golang如何实现容器化日志收集与分析_Golang容器日志收集分析方法  抖音网页版企业服务中心登录入口_抖音网页版企业登录平台  在J*a中如何使用Stream.map转换元素_Stream映射操作解析  J*aScript中安全有效地处理localStorage字符串数据  深入理解Google Cloud Datastore查询:祖先路径与数据一致性  怎么在html里运行vbs脚本_html中运行vbs脚本方法【教程】  Golang如何安装Swagger工具_GoSwagger文档生成环境  在J*a中如何使用BigDecimal进行高精度计算_BigDecimal类应用指南  XML中包含HTML标签导致解析错误? 正确嵌入非XML数据的两种方法  c++如何实现一个简单的ECS框架_c++数据驱动设计与游戏开发  马斯克:Optimus 人形机器人复数形式为 Optimi  steam官方入口大全 steam账号注册及操作指南  海量存储:机器视觉智能化的核心基石  C++的std::forward_list怎么用_C++ STL中单向链表容器的特点与应用  Yandex免登录官网入口_俄罗斯Yandex搜索引擎直达链接  LINQ to XML为何解析失败? 深入理解C# XDocument的异常处理  Win10桌面图标出现小盾牌怎么办 Win10去除UAC图标教程【解决】  FullCalendar 自定义按钮样式定制指南  yandex入口引擎手机版 yandex安卓版下载入口  菜鸟取件码是什么怎么查 最全查询渠道汇总  Pygame教程:解决用户输入与游戏状态更新不同步问题  Angular中父组件异步更新子组件复选框状态的实践指南  12306几点到几点不能订票? | 官方最新系统维护时间全解析  如何创建独立于主系统的J*a运行环境_隔离式环境搭建策略  妖精漫画网页版登录入口免费_妖精漫画官网主页直接阅读漫画  Web Components中自定义开关组件状态同步的常见陷阱与解决方案  Selenium Python中处理点击后新窗口加载冻结问题的策略与实践  Python实现多节点属性重叠度分析教程  如何设置Windows Defender的定时扫描_计划任务实现自动杀毒【安全】  如何高效处理PHP中的Excel数据导入导出?PortPHP/Spreadsheet助你轻松搞定!  HTML转PPT成品工具有哪些?HTML网页转PPT成品工具大全  天眼查企业查询官网入口 天眼查官方网页版查询  在VS Code中配置和运行Dart程序的完整步骤  Go语言中的*string:深入理解字符串指针  网站内容防复制粘贴的实现策略与局限性  Win10怎么制作U盘启动盘 Win10系统安装U盘制作教程【详解】  深入理解Promise链:如何在catch后中断then的执行  mysql密码锁定怎么解锁_mysql密码锁定解锁后修改密码步骤  支付宝如何设置安全保护_支付宝安全设置的全面教程  漫蛙漫画官方首页 漫蛙2漫画在线阅读入口  Win11怎么开启高性能模式_Windows 11电源计划优化设置  12306选座怎么选到临时改签座_12306改签选座策略与步骤  PHP URL参数传递与500错误调试指南  Fabric模组开发:自定义物品与物品组的现代管理方法  一加手机电池耗电快怎么办_一加手机电池耗电快的解决方法  解决macOS上安装pyhdf时‘hdf.h’文件缺失的编译错误  CSS如何设置hover状态颜色_hover伪类调整背景或文字颜色  c++中的std::launder有什么实际用途_c++对象生命周期与指针优化 

搜索