我有 2 个 dags,每个 dags 都向 Airflow 中的 Xcom 发送一个 json。
最后一个 dag 将每个 json 将其转换为 pandas df 并合并它。
由于某种原因,它不起作用,我正在努力保持冷静。
#Snowlflake (Xcom)
snowflake_dict = [{"table_name" :"leads" ,"last_update_date_snow" :1699269594000}
,{"table_name" :"loan" ,"last_update_date_snow" :1698966458752}]
# mySQL (Xcom)
mysql_dict = [{"table_name" :"leads" ,"last_update_date_my" :1699341986000}
,{"table_name" :"loan" ,"last_update_date_my" :1699275608110}]
df_snowflake = pd.DataFrame(snowflake_dict)
df_mysql = pd.DataFrame(mysql_dict)
#simple shit
merge_df = pd.merge(df_snowflake, df_mysql, on='table_name', how='inner')
这是相关的标签:
merge_last_update = PythonOperator(
task_id='merge',
python_callable=merge_last_update,
provide_context=True,
dag=dag
我在 Airflow 中看到的错误:
文件“/usr/local/airflow/dags/resources/Snowflake_sync/merge_last_update.py”,第 31 行,在 merge_last_update 中 merge_df = pd.merge(df_snowflake, df_mysql, on='table_name', how='inner') 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/reshape/merge.py”,第 110 行,合并 操作 = _MergeOperation( 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/reshape/merge.py”,第 703 行,在 init 中 ) = self._get_merge_keys() 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/reshape/merge.py”,第 1162 行,在 _get_merge_keys 中 right_keys.append(right._get_label_or_level_values(rk)) 文件“/usr/local/airflow/.local/lib/python3.10/site-packages/pandas/core/generic.py”,第 1850 行,在 _get_label_or_level_values 中 引发 KeyError(key) 键错误:“表名”
在 Apache Airflow 上合并多个 DataFrame 时可以采取两种方法(您可以根据您的代码调整这些方法):
A.
B.