我们在 Kubernetes (KubernetesExecutor) 上使用 Airflow v2.2.3, 我们的环境需要 DAG 预客户,并且每个客户可以位于不同的时区。
每个 DAG 应在午夜安排在自己的时区, 我发现它可以使用 Airflow 的时区感知 DAG
来实现因此为每个 DAG 配置时区感知
start_date
,使每个 DAG 在自己的时区午夜执行:
start_date_utc = (datetime.now() - timedelta(days=2)).replace(
hour=0, minute=0, second=0, microsecond=0)
timezone = pendulum.timezone(get_customer_timezone(customer))
START_DATE = start_date_utc.replace(tzinfo=timezone)
default_args = {
"owner": "owner",
"depends_on_past": False,
"start_date": START_DATE,
}
dag = DAG(
dag_id,
schedule_interval="0 0 * * *",
default_args=default_args,
tags=[cusotmer_name]
)
date = '{{ execution_date | ds }}'
operator_args = {
"customer_date": date,
}
我的问题是jinja模板和dag_run
execution_date
(dag_run.逻辑_日期)仍然是UTC,并且没有根据DAG时区进行调整。
在不同时区运行 DAG 时,这会导致意外行为,但时区偏移早于 UTC 的 DAG
execution_date
是错误的(2 天前,而不是 1 天)
我需要一些关于如何根据 DAG 时区更改
execution_date
的建议
谢谢
execution_date
的值采用 UTC。
要转换为不同的时区,您可以执行以下操作:
{{ execution_date.in_timezone('Europe/Amsterdam') }}
如果您在 DAG 中设置时区,您可以从
dag.timezone
访问它并使用它 Jinja。
{{ dag_run.execution_date.astimezone(dag.timezone) }}
您已经注意到
execution_date
已被弃用,因此您应该将 logical_date
用作:
{{ dag_run.logical_date.astimezone(dag.timezone) }}
import pendulum
local_tz = pendulum.timezone("Europe/Amsterdam")
local_tz.convert(logical_date)