合并两个pandas DataFrame,结果在Airflow中为空(本地有效)

问题描述 投票:0回答:1

我有 2 个 dags,每个 dags 都向 Airflow 中的 Xcom 发送一个 json。

最后一个 dag 将每个 json 将其转换为 pandas df 并合并它。

由于某种原因,它不起作用,我正在努力保持冷静。

#Snowlflake (Xcom)
snowflake_dict = [{"table_name" :"leads" ,"last_update_date_snow" :1699269594000}
                  ,{"table_name" :"loan" ,"last_update_date_snow" :1698966458752}]
# mySQL (Xcom)
mysql_dict = [{"table_name" :"leads" ,"last_update_date_my" :1699341986000}
              ,{"table_name" :"loan" ,"last_update_date_my" :1699275608110}]

df_snowflake = pd.DataFrame(snowflake_dict)

df_mysql = pd.DataFrame(mysql_dict)

#simple shit
merge_df = pd.merge(df_snowflake, df_mysql, on='table_name', how='inner')

这是相关的标签:

merge_last_update = PythonOperator(
    task_id='merge',
    python_callable=merge_last_update,
    provide_context=True,
    dag=dag

我在 Airflow 中看到的错误:

文件“/usr/local/airflow/dags/resources/Snowflake_sync/merge_last_update.py”,第 31 行,在 merge_last_update 中 merge_df = pd.merge(df_snowflake, df_mysql, on='table_name', how='inner') 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/reshape/merge.py”,第 110 行,合并 操作 = _MergeOperation( 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/reshape/merge.py”,第 703 行,在 init 中 ) = self._get_merge_keys() 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/reshape/merge.py”,第 1162 行,在 _get_merge_keys 中 right_keys.append(right._get_label_or_level_values(rk)) 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/generic.py”,第 1850 行,在 _get_label_or_level_values 中 引发 KeyError(key) 键错误:“表名”

python pandas airflow
1个回答
0
投票

在 Apache Airflow 上合并多个 DataFrame 时可以采取两种方法(您可以根据您的代码调整这些方法):

A.

  1. 我们需要创建一个空列表来放置DataFrame:all_dataframes = []
  2. 我们将使用 pandas 的追加函数添加到 all_dataframes 中:all_dataframes.append(df)
  3. 我们使用 pandas 的 concat 函数合并列表中的所有 DataFrame:final_dataframe = pd.concat(all_dataframes,ignore_index=True)

B.

  1. 我们将相关的 DataFrame 添加到一个列表中,我们将在其中放置 DataFrame:all_dataframes = [block_bid_df, block_offer_df, Price_independent_bid_df, Price_independent_offer_df]
  2. final_df = reduce(lambda left, right: pd.merge(left, right, on=['Date'], how='outer'), all_dataframes)
© www.soinside.com 2019 - 2024. All rights reserved.