我想把SQL中的数据存储到Pandas数据框架中,并做一些数据转换,然后加载到另一个表,用Airflow来实现。
我所面临的问题是,连接到表的连接字符串只能通过Airflow访问,所以我需要使用airflow作为媒介来读写数据。所以我需要使用airflow作为媒介来读写数据。
如何才能做到这一点?
我的代码
Task1 = PostgresOperator(
task_id='Task1',
postgres_conn_id='REDSHIFT_CONN',
sql="SELECT * FROM Western.trip limit 5 ",
params={'limit': '50'},
dag=dag
任务的输出需要存储到数据框(df)中,经过转发后再加载到另一个表中。
如何才能做到这一点?
我怀疑是否有一个内置的操作符。你可以很容易地写一个自定义操作符
PostgresOperator
或者只是 BaseOperator
任何其他您选择的操作符。所有的自定义代码都进入了 被覆盖 execute()
办法PostgresHook
以获得 Pandas
DataFrame
援用 get_pandas_df()
功能pandas
df
insert_rows()
将数据插入表格的功能更新-1
按照要求,我在此增加了操作者的代码。
from typing import Dict, Any, List, Tuple
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.decorators import apply_defaults
from pandas import DataFrame
class MyCustomOperator(PostgresOperator):
@apply_defaults
def __init__(self, destination_table: str, *args, **kwargs):
super().__init__(*args, **kwargs)
self.destination_table: str = destination_table
def execute(self, context: Dict[str, Any]):
# create PostgresHook
self.hook: PostgresHook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
schema=self.database)
# read data from Postgres-SQL query into pandas DataFrame
df: DataFrame = self.hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
# perform transformations on df here
df['column_to_be_doubled'] = df['column_to_be_doubled'].multiply(2)
..
# convert pandas DataFrame into list of tuples
rows: List[Tuple[Any, ...]] = list(df.itertuples(index=False, name=None))
# insert list of tuples in destination Postgres table
self.hook.insert_rows(table=self.destination_table, rows=rows)
注意:该代码段仅供参考,尚未经过测试。
参考文献
进一步修改改进
destination_table
param可以从 Variable
Postgres
模式,然后我们可以取另一个参数,如 destination_postgres_conn_id
在 __init__
并以此来创建一个 destination_hook
我们可以在其上调用 insert_rows
办法