我目前正在开展一个需要使用气流的学术项目。目前,我只处理一个数据库表,因此我需要一些帮助才能知道我是否朝着正确的方向前进。
我有一个名为 data_transformation.py 的 .py 文件:
def get_df_from_db():
conn = pymysql.connect(host='HOST',
user='USER',
password='PW',
database='DB')
query = "SELECT * FROM table"
df = pd.read_sql(query, conn)
df.reset_index(inplace=True)
df.drop('index', axis=1, inplace=True)
connection.close()
return df
def clean_colorTypes(df):
df['colorType'] = df['colorType'].apply(lambda x: x if x in ['Red', 'Blue', 'Orange'] else 'Others')
df = df[df['numOfButtons'] <= 2]
return df
def add_yearsSinceManufactured(df):
df['yearsSinceManu'] = df['yearListed'] - df['yearManufactured']
return df
def add_to_db(df):
#create connection to mysql using create_engine
df.to_sql(name = 'cleaned', con = conn, if_exists = 'append', index = False)
return df
我的气流 dag 文件的想法是:
但是,我不确定这种方法是否可行,因为我在每个函数中都传入 df 。
我会按照已经建议的方式进行操作,要么让数据库完成所有繁重的工作,每个查询作为一个单独的任务,中间有临时表。
或者,如果您坚持使用 Pandas,每个转换都保存到 CSV - Airflow DB (XCOM) 根本不适合保存大量数据。
DB 路线仍然是 ETL 管道: