我正在致力于将数据管道框架 Kedro 与流行的 Python Web 应用程序框架 Streamlit 集成,以构建数据处理应用程序。主要目标是从我的 Streamlit 应用程序中运行特定的 Kedro 管道,使用自定义
DataCatalog
来管理和加载 DataFrame。
问题详情:
整合背景:
我已成功集成 Kedro 和 Streamlit,并且我可以从我的 Streamlit 应用程序运行 Kedro 管道。但是,我想将 Streamlit 中加载的自定义数据作为
DataCatalog
传递到 Kedro 管道进行处理。
Streamlit 中的数据加载:
在 Streamlit 中,我使用文件上传器从各种来源(例如 CSV、Excel)加载数据。然后我根据这些数据创建 DataFrame。
运行 Kedro 管道:
在 Streamlit 中加载和处理数据后,我想执行特定的 Kedro 管道,传递包含加载的 DataFrame 的自定义
DataCatalog
。管道处理这些数据并存储结果。
遇到错误:
当我尝试使用自定义
DataCatalog
运行 Kedro 管道时,我遇到以下错误消息:
TypeError: KedroSession.run() got an unexpected keyword argument 'extra_params'
AttributeError: can't set attribute 'catalog'
AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
如何从 Streamlit 应用程序中正确运行 Kedro 管道,传递动态创建的 DataCatalog? Kedro 版本:最新 Streamlit 版本:最新
# Code block 1: Loading data in Streamlit
if st.button('Processar Dados de Entrada'):
# Executar a pipeline do Kedro
with KedroSession.create(project_path=project_path) as session:
catalog = DataCatalog({
"reagentes_raw": MemoryDataSet(df1),
"balanco_de_massas_raw": MemoryDataSet(df2),
"laboratorio_raw": MemoryDataSet(df3),
"laboratorio_raiox_raw": MemoryDataSet(df4),
"carta_controle_pims_raw": MemoryDataSet(df5),
"blend_raw": MemoryDataSet(df6)
})
session.run(data_catalog=catalog, pipeline_name="tag_web_app")
st.success('Dados de entrada processados com sucesso!')
# Ler o arquivo Parquet após a execução da pipeline
merged_data = catalog.load("merged_raw_data_process")
# Supondo que o arquivo Parquet tenha uma coluna de timestamp
last_update = merged_data['timestamp_column'].max()
st.header('Resultado da Atualização da base de dados')
st.write(f"Dados de entrada processados com sucesso: a data e hora da informação mais recente é {last_update.strftime('%d/%m/%Y %H:%M')}")
# Code block 2: Attempt to set DataCatalog in KedroContext
if st.button('Processar Dados de Entrada'):
with KedroSession.create(project_path=project_path) as session:
context = session.load_context()
context.catalog = catalog # This line triggers an AttributeError
runner = SequentialRunner()
session.run(pipeline_name="tag_web_app")
runner.run(pipeline=context.pipeline_registry.get("tag_web_app"), catalog=catalog) # This line triggers an AttributeError
错误2:
AttributeError: can't set attribute 'catalog'
错误3:
AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
AttributeError: can't set attribute 'catalog'
KedroSession.catalog
已只读一段时间,您尝试做的事情无法完成。
AttributeError: 'KedroContext' object has no attribute 'pipeline_registry'
KedroContext
从来没有 pipeline_registry
属性。