如何在气流中分别运行任务?

问题描述 投票:0回答:2

我有一个要运行脚本的表列表。当我一次执行一张表时,它成功运行,但是当我在任务上方尝试for循环时,它一次运行所有表,给我多个错误。

这是我的代码:

def create_tunnel_postgres():

    psql_host = ''
    psql_port = 5432
    ssh_host= ''
    ssh_port = 22
    ssh_username = ''
    pkf = paramiko.RSAKey.from_private_key(StringIO(Variable.get('my_key')))

    server = SSHTunnelForwarder(
        (ssh_host, 22),
        ssh_username=ssh_username,
        ssh_private_key=pkf,
        remote_bind_address=(psql_host, 5432))

    return server

def conn_postgres_internal(server):
    """
    Using the server connect to the internal postgres
    """
    conn = psycopg2.connect(
        database='pricing',
        user= Variable.get('postgres_db_user'),
        password= Variable.get('postgres_db_key'),
        host=server.local_bind_host,
        port=server.local_bind_port,
    )

    return conn

def gzip_postgres_table(**kwargs):
    """

    path='/path/{}.csv'.format(table_name)
    server_postgres = create_tunnel_postgres()
    server_postgres.start()
    etl_conn = conn_postgres_internal(server_postgres)
    cur=etl_conn.cursor()
    cur.execute("""
        select * from schema.db.{} limit 100;
        """.format(table_name))
    result = cur.fetchall()
    column_names = [i[0] for i in cur.description]
    fp = gzip.open(path, 'wt')
    myFile = csv.writer(fp,delimiter=',')
    myFile.writerow(column_names)
    myFile.writerows(result)
    fp.close()
    etl_conn.close()
    server_postgres.stop()


#------------------------------------------------------------------------------------------------------------------------------------------------

default_args = {
    'owner': 'mae',
    'depends_on_past':False,
    'start_date': datetime(2020,1,1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'retry_delay': timedelta(minutes=1)
}


tables= ['table1','table2']
s3_folder='de'
current_timestamp=datetime.now()



#Element'S VARIABLES

dag = DAG('dag1',
          description = 'O',
          default_args=default_args,
          max_active_runs=1,
          schedule_interval= '@once',
          #schedule_interval='hourly'
          catchup = False )


for table_name in pricing_table_name:
    t1 = PythonOperator(
        task_id='{}_gzip_table'.format(table_name),
        python_callable= gzip_postgres_table,
        provide_context=True,
        op_kwargs={'table_name':table_name,'s3_folder':s3_folder,'current_timestamp':current_timestamp},
        dag = dag)

是否有一种方法可以先运行table1。完成它,然后再运行表2?我尝试使用表中的for table_name进行此操作:但无济于事。任何想法或建议都会有所帮助。

python airflow airflow-scheduler
2个回答
0
投票

您的for正在为表处理创建多个任务,默认情况下,这将使气流并行执行任务。

您可以将number of workers in the airflow config file设置为1,或仅创建1个任务并在任务内部运行循环,然后将其同步执行。


0
投票

我看过您的代码,似乎您正在使用循环语句创建多个DAG任务,该语句并行运行任务。

有某些方法可以满足您的要求。

  1. 使用序列执行器。

airflow.executors.sequential_executor.SequentialExecutor which will only run task instances sequentially.

https://airflow.apache.org/docs/stable/start.html#quick-start

  1. 创建一个根据您的需要工作的脚本。

创建一个脚本(Python)并将其用作PythonOperator,该脚本针对表的数量重复当前函数。

  1. 将气流执行器(并行度)限制为1。

您可以在其airflow.cfg配置文件中将气流工人限制为1。

步骤:

从气流根目录(AIRFLOW_HOME)打开airflow.cfg。

设置/更新parallelism = 1

重新启动气流。

这应该有效。

© www.soinside.com 2019 - 2024. All rights reserved.