我有一个简单的 DAG(启用了
render_template_as_native_obj
选项)。我试图将 {{ dag_run }}
传递给 PythonVirtualenvOperator
,但是当它开始执行时,它失败并出现 NameError: name 'timedelta' is not defined
错误。
我正在尝试在
Python 3.11
、airflow==2.7.3
和 dill==0.3.7
上使用它。
我该如何处理这个错误?
这是我的 DAG:
import datetime
from pathlib import Path
import airflow
from airflow import DAG
from airflow.operators.python import PythonOperator, PythonVirtualenvOperator
import dill
dag = DAG(
dag_id='strange_pickling_error_dag',
schedule_interval='0 5 * * 1',
start_date=datetime.datetime(2020, 1, 1),
catchup=False,
render_template_as_native_obj=True,
)
context = {"ts": "{{ ts }}", "dag_run": "{{ dag_run }}"}
op_args = [context, Path(__file__).parent.absolute()]
def make_foo(*args, **kwargs):
print("---> making foo!")
print("make foo(...): args")
print(args)
print("make foo(...): kwargs")
print(kwargs)
make_foo_task = PythonVirtualenvOperator(
task_id='make_foo',
python_callable=make_foo,
use_dill=True,
system_site_packages=False,
op_args=op_args,
requirements=[f"dill=={dill.__version__}", f"apache-airflow=={airflow.__version__}"],
dag=dag)
这是错误:
[2023-11-06, 18:23:21 UTC] {process_utils.py:182} INFO - Executing cmd: /tmp/venvqse65m1b/bin/python /tmp/venvqse65m1b/script.py /tmp/venvqse65m1b/script.in /tmp/venvqse65m1b/script.out /tmp/venvqse65m1b/string_args.txt /tmp/venvqse65m1b/termination.log
[2023-11-06, 18:23:21 UTC] {process_utils.py:186} INFO - Output:
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - Traceback (most recent call last):
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - File "/tmp/venvqse65m1b/script.py", line 17, in <module>
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - arg_dict = dill.load(file)
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - ^^^^^^^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - File "/tmp/venvqse65m1b/lib/python3.11/site-packages/dill/_dill.py", line 287, in load
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - return Unpickler(file, ignore=ignore, **kwds).load()
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - File "/tmp/venvqse65m1b/lib/python3.11/site-packages/dill/_dill.py", line 442, in load
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - obj = StockUnpickler.load(self)
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - ^^^^^^^^^^^^^^^^^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/pendulum/tz/timezone.py", line 312, in __init__
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - self._utcoffset = timedelta(seconds=offset)
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - ^^^^^^^^^
[2023-11-06, 18:23:21 UTC] {process_utils.py:190} INFO - NameError: name 'timedelta' is not defined
[2023-11-06, 18:23:22 UTC] {taskinstance.py:1937} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 395, in execute
return super().execute(context=serializable_context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 192, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 609, in execute_callable
result = self._execute_python_callable_in_subprocess(python_path, tmp_path)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/operators/python.py", line 446, in _execute_python_callable_in_subprocess
execute_in_subprocess(
File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 171, in execute_in_subprocess
execute_in_subprocess_with_kwargs(cmd, cwd=cwd)
File "/home/felix/Projects/alfabank/fs_etl/venv_py3.11/lib/python3.11/site-packages/airflow/utils/process_utils.py", line 194, in execute_in_subprocess_with_kwargs
raise subprocess.CalledProcessError(exit_code, cmd)
subprocess.CalledProcessError: Command '['/tmp/venvqse65m1b/bin/python', '/tmp/venvqse65m1b/script.py', '/tmp/venvqse65m1b/script.in', '/tmp/venvqse65m1b/script.out', '/tmp/venvqse65m1b/string_args.txt', '/tmp/venvqse65m1b/termination.log']' returned non-zero exit status 1.
经过一番调查后我发现:
timedelta
的这个问题会出现在dill >= 3.6
中。对于dill==0.3.5.1
,它不会发生;
这个问题可以通过使用
pendulum==3.0
来修复,但目前airflow
不支持(已经有一个PR:https://github.com/apache/airflow/pull/34744但它是)尚未合并到主分支)。