如何用Airflow做存储sql输出到pandas数据框?

问题描述 投票:6回答:1

我想把SQL中的数据存储到Pandas数据框架中,并做一些数据转换,然后加载到另一个表,用Airflow来实现。

我所面临的问题是,连接到表的连接字符串只能通过Airflow访问,所以我需要使用airflow作为媒介来读写数据。所以我需要使用airflow作为媒介来读写数据。

如何才能做到这一点?

我的代码

Task1 = PostgresOperator(
    task_id='Task1',
    postgres_conn_id='REDSHIFT_CONN',
    sql="SELECT * FROM Western.trip limit 5 ",
    params={'limit': '50'},
    dag=dag

任务的输出需要存储到数据框(df)中,经过转发后再加载到另一个表中。

如何才能做到这一点?

airflow
1个回答
0
投票

我怀疑是否有一个内置的操作符。你可以很容易地写一个自定义操作符

  • 延伸 PostgresOperator 或者只是 BaseOperator 任何其他您选择的操作符。所有的自定义代码都进入了 被覆盖 execute() 办法
  • 然后使用 PostgresHook 以获得 Pandas DataFrame 援用 get_pandas_df() 功能
  • 执行你在你的产品中必须要做的任何转换。pandas df
  • 最后使用 insert_rows() 将数据插入表格的功能

更新-1

按照要求,我在此增加了操作者的代码。

from typing import Dict, Any, List, Tuple

from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.decorators import apply_defaults
from pandas import DataFrame


class MyCustomOperator(PostgresOperator):

    @apply_defaults
    def __init__(self, destination_table: str, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.destination_table: str = destination_table

    def execute(self, context: Dict[str, Any]):
        # create PostgresHook
        self.hook: PostgresHook = PostgresHook(postgres_conn_id=self.postgres_conn_id,
                                               schema=self.database)
        # read data from Postgres-SQL query into pandas DataFrame
        df: DataFrame = self.hook.get_pandas_df(sql=self.sql, parameters=self.parameters)
        # perform transformations on df here
        df['column_to_be_doubled'] = df['column_to_be_doubled'].multiply(2)
        ..
        # convert pandas DataFrame into list of tuples
        rows: List[Tuple[Any, ...]] = list(df.itertuples(index=False, name=None))
        # insert list of tuples in destination Postgres table
        self.hook.insert_rows(table=self.destination_table, rows=rows)

注意:该代码段仅供参考,尚未经过测试。

参考文献

进一步修改改进

  • destination_table param可以从 Variable
  • 如果目标表不一定位于同一个地方,那么就会有一个目标表。Postgres 模式,然后我们可以取另一个参数,如 destination_postgres_conn_id__init__ 并以此来创建一个 destination_hook 我们可以在其上调用 insert_rows 办法
© www.soinside.com 2019 - 2024. All rights reserved.