Kedro 和 Streamlit 集成 - 使用自定义 DataCatalog 运行 Kedro 管道

问题描述 投票:0回答:1

我正在致力于将数据管道框架 Kedro 与流行的 Python Web 应用程序框架 Streamlit 集成,以构建数据处理应用程序。主要目标是从我的 Streamlit 应用程序中运行特定的 Kedro 管道,使用自定义

DataCatalog
来管理和加载 DataFrame。

问题详情:

  1. 整合背景:

    我已成功集成 Kedro 和 Streamlit,并且我可以从我的 Streamlit 应用程序运行 Kedro 管道。但是,我想将 Streamlit 中加载的自定义数据作为

    DataCatalog
    传递到 Kedro 管道进行处理。

  2. Streamlit 中的数据加载:

    在 Streamlit 中,我使用文件上传器从各种来源(例如 CSV、Excel)加载数据。然后我根据这些数据创建 DataFrame。

  3. 运行 Kedro 管道:

    在 Streamlit 中加载和处理数据后,我想执行特定的 Kedro 管道,传递包含加载的 DataFrame 的自定义

    DataCatalog
    。管道处理这些数据并存储结果。

  4. 遇到错误:

    当我尝试使用自定义

    DataCatalog
    运行 Kedro 管道时,我遇到以下错误消息:

    • TypeError: KedroSession.run() got an unexpected keyword argument 'extra_params'
    • AttributeError: can't set attribute 'catalog'
    • AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

如何从 Streamlit 应用程序中正确运行 Kedro 管道,传递动态创建的 DataCatalog? Kedro 版本:最新 Streamlit 版本:最新

# Code block 1: Loading data in Streamlit
if st.button('Processar Dados de Entrada'):
    # Executar a pipeline do Kedro
    with KedroSession.create(project_path=project_path) as session:
        catalog = DataCatalog({
            "reagentes_raw": MemoryDataSet(df1),
            "balanco_de_massas_raw": MemoryDataSet(df2),
            "laboratorio_raw": MemoryDataSet(df3),
            "laboratorio_raiox_raw": MemoryDataSet(df4),
            "carta_controle_pims_raw": MemoryDataSet(df5),
            "blend_raw": MemoryDataSet(df6)
        })
        session.run(data_catalog=catalog, pipeline_name="tag_web_app")
    st.success('Dados de entrada processados com sucesso!')

    # Ler o arquivo Parquet após a execução da pipeline
    merged_data = catalog.load("merged_raw_data_process")
    # Supondo que o arquivo Parquet tenha uma coluna de timestamp
    last_update = merged_data['timestamp_column'].max()
    st.header('Resultado da Atualização da base de dados')
    st.write(f"Dados de entrada processados com sucesso: a data e hora da informação mais recente é {last_update.strftime('%d/%m/%Y %H:%M')}")

# Code block 2: Attempt to set DataCatalog in KedroContext
if st.button('Processar Dados de Entrada'):
    with KedroSession.create(project_path=project_path) as session:
        context = session.load_context()
        context.catalog = catalog  # This line triggers an AttributeError
        runner = SequentialRunner()
        session.run(pipeline_name="tag_web_app")
        runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=catalog)  # This line triggers an AttributeError

错误2:

AttributeError: can't set attribute 'catalog'

错误3:

AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

python streamlit catalog kedro
1个回答
0
投票

错误2

AttributeError: can't set attribute 'catalog'

KedroSession.catalog
已只读一段时间,您尝试做的事情无法完成。

错误3

AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'

KedroContext
从来没有
pipeline_registry
属性。

© www.soinside.com 2019 - 2024. All rights reserved.