如何正确使用@task.kubernetes()装饰器导入第三方依赖?

问题描述 投票:0回答:1

我有一个 Airflow DAG 任务,我想在 K8s pod 上运行。这是一个例子

from mymodule import process_data # this is my module import
from decouple import AutoConfig # this is externally installed dependency 

@dag(
    "model_trainer",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    schedule=None,
)
def pipeline():
    @task.kubernetes(image="python")
    def fetch_data():
        return process_data()

现在运行这段代码(

pipeline().test()
)给了我

NameError: name 'process_data' is not defined

这是可以理解的,因为我使用的是 python 图像并且没有 process_data。它也没有外部安装的依赖项。我的问题是,处理这种情况的最佳方法是什么?我是否创建自己的已安装依赖项的映像,然后在函数中导入我的模块?

python kubernetes airflow airflow-2.x
1个回答
0
投票

是的,将导入移至任务中,并使用自定义图像。

第 1 步:创建添加代码的 Dockerfile

FROM python:3.10.0
RUN mkdir /mymodule
COPY mymodule mymodule
RUN pip install -r requirements.txt
ENV PYTHONPATH "${PYTHONPATH}:/mymodule"  # Makes mymodule importable

第 2 步:构建 docker 镜像并将其推送到私有 docker 存储库(即 GKE / ECR / Docker Hub)

第 3 步:更新您的 DAG 并将导入位置移至任务中。


@dag(
    "model_trainer",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    schedule=None,
)
def pipeline():
    @task.kubernetes(image="your_image_with_mymodule")
    def fetch_data():
        from mymodule import process_data # MOVED INTO THE TASK
        from decouple import AutoConfig # You can delete this

        return process_data()
© www.soinside.com 2019 - 2024. All rights reserved.