我有一个 Airflow DAG 任务,我想在 K8s pod 上运行。这是一个例子
from mymodule import process_data # this is my module import
from decouple import AutoConfig # this is externally installed dependency
@dag(
"model_trainer",
start_date=datetime(2023, 1, 1),
catchup=False,
schedule=None,
)
def pipeline():
@task.kubernetes(image="python")
def fetch_data():
return process_data()
现在运行这段代码(
pipeline().test()
)给了我
NameError: name 'process_data' is not defined
这是可以理解的,因为我使用的是 python 图像并且没有 process_data。它也没有外部安装的依赖项。我的问题是,处理这种情况的最佳方法是什么?我是否创建自己的已安装依赖项的映像,然后在函数中导入我的模块?
是的,将导入移至任务中,并使用自定义图像。
第 1 步:创建添加代码的 Dockerfile
FROM python:3.10.0
RUN mkdir /mymodule
COPY mymodule mymodule
RUN pip install -r requirements.txt
ENV PYTHONPATH "${PYTHONPATH}:/mymodule" # Makes mymodule importable
第 2 步:构建 docker 镜像并将其推送到私有 docker 存储库(即 GKE / ECR / Docker Hub)
第 3 步:更新您的 DAG 并将导入位置移至任务中。
@dag(
"model_trainer",
start_date=datetime(2023, 1, 1),
catchup=False,
schedule=None,
)
def pipeline():
@task.kubernetes(image="your_image_with_mymodule")
def fetch_data():
from mymodule import process_data # MOVED INTO THE TASK
from decouple import AutoConfig # You can delete this
return process_data()