How to add rows to a table using xcom_pull in PostgresOperator AIRFLOW

问题描述 投票:0回答:0

我是 Airflow 的新手。我想使用 xcom_pull 将 PostgresOperator 中的数据添加到表中,我是这样做的:

 load_data = PostgresOperator(task_id="load_data",
                                postgres_conn_id="database_my",
                                 sql=[f"""INSERT INTO test VALUES
                                    ('{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['id_district']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['coord']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['address']}}}}',
                                    '{{{{ti.xcom_pull(key='transform1', task_ids=['transform_data'])[0][{i}]['properties_json']}}}}'
                                    )
                                """ for i in range(0, 5)])

但是出现这样的错误

[2023-02-28, 20:03:48 UTC] {sql.py:375} INFO - Running statement: INSERT INTO test VALUES
                                    (1,
                                    3,
                                    '[20, 50]',
                                    'город ГОРОД',
                                    '{'number': {'title': 'TITLE1', 'value': 'VALUE1'}, 'name': {'title': 'TITLE2', 'value': 'VALUE2 "VALUE2"'}, 'activity': {'title': 'TITLE3', 'value': 'VALUE3'}}'
                                    )
                                , parameters: None
[2023-02-28, 20:03:48 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/operators/sql.py", line 260, in execute
    output = hook.run(
  File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/hooks/sql.py", line 349, in run
    self._run_command(cur, sql_statement, parameters)
  File "/opt/airflow/lib/python3.8/site-packages/airflow/providers/common/sql/hooks/sql.py", line 380, in _run_command
    cur.execute(sql_statement)
psycopg2.errors.SyntaxError: syntax error at or near "number"
LINE 5:                                     '{'number': {'title': 'T...
                                               ^

[2023-02-28, 20:03:49 UTC] {taskinstance.py:1401} INFO - Marking task as FAILED. dag_id=data_579, task_id=load_data, execution_date=20230228T190000, start_date=20230228T200348, end_date=20230228T200349
[2023-02-28, 20:03:49 UTC] {standard_task_runner.py:100} ERROR - Failed to execute job 10632 for task load_data (syntax error at or near "number"
LINE 5:                                     '{'number': {'title': 'T...
                                               ^
; 2491897)
[2023-02-28, 20:03:49 UTC] {local_task_job.py:164} INFO - Task exited with return code 1

我对如何从xcom_pull中找出数据长度的问题也很感兴趣。 这样循环的执行次数与我的行数一样多。

我知道点在单引号里,但是如何做双引号?

json database postgresql airflow airflow-xcom
© www.soinside.com 2019 - 2024. All rights reserved.