有没有办法在气流中并行化我的 pandas 数据框函数?

问题描述 投票:0回答:1

我目前正在开展一个需要使用气流的学术项目。目前,我只处理一个数据库表,因此我需要一些帮助才能知道我是否朝着正确的方向前进。

我有一个名为 data_transformation.py 的 .py 文件:

def get_df_from_db():
    conn = pymysql.connect(host='HOST',
                                 user='USER',
                                 password='PW',
                                 database='DB')
    query = "SELECT * FROM table"
    df = pd.read_sql(query, conn)
    df.reset_index(inplace=True)
    df.drop('index', axis=1, inplace=True)
    connection.close()
    return df

def clean_colorTypes(df):
    df['colorType'] = df['colorType'].apply(lambda x: x if x in ['Red', 'Blue', 'Orange'] else 'Others')
    df = df[df['numOfButtons'] <= 2]
    return df

def add_yearsSinceManufactured(df):
    df['yearsSinceManu'] = df['yearListed'] - df['yearManufactured']
    return df

def add_to_db(df):
    #create connection to mysql using create_engine
    df.to_sql(name = 'cleaned', con = conn, if_exists = 'append', index = False)

    return df

我的气流 dag 文件的想法是:

  1. 执行 get_df_from_db
  2. 同时执行 clean_colorTypes 和 add_yearsSinceManufactured 因为它们不依赖
  3. 第2步完成后,执行add_to_db

但是,我不确定这种方法是否可行,因为我在每个函数中都传入 df 。

python pandas airflow
1个回答
0
投票

我会按照已经建议的方式进行操作,要么让数据库完成所有繁重的工作,每个查询作为一个单独的任务,中间有临时表。

或者,如果您坚持使用 Pandas,每个转换都保存到 CSV - Airflow DB (XCOM) 根本不适合保存大量数据。

DB 路线仍然是 ETL 管道:

  1. 您提取必要的数据
  2. 将其加载到您的数据库
  3. 随心所欲地改变它
© www.soinside.com 2019 - 2024. All rights reserved.