在 Airflow 内的 Python 任务中处理异步包

问题描述 投票:0回答:1

我正在处理本地气流设置中的一个恼人的错误。

我使用 LocalExecutor 和 MySQL 数据库(如果有帮助的话)。

我的任务看起来像这样:


@task()
def pdf_to_text_task() -> None:
    file_storage_instance = get_file_storage_instance_from_task()

    pdf_fpath = get_pdf_fpath(file_storage_instance)
    pages = parse_pdf(pdf_fpath) # <--- this package breaks Airflow task

    pages = re.split(r"(?<!\|)---(?! \|)", pages[0])

    text = pdf_to_text(pages)
    filename = pdf_fpath.name.split(".")[0]
    save_file(json.dumps(text), f"{filename}.json")

当使用这个包时,我收到此错误:

[2024-04-24, 20:20:17 CEST] {task_context_logger.py:104} ERROR - Executor reports task instance <TaskInstance: main.pdf_to_text_task 6f30866d-72b3-449b-8e74-7d4a4b99aaa1 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?

根据

parse_pdf
后面的包的文档,它是一个异步第一个包。 (它只是由一个调用 API 的包组成)。

我非常沮丧,因为没有真正的根本原因可能导致这种情况。

我只是假设上述包导致了这个问题,因为它是唯一的变化。

什么可能导致此错误?

谢谢!

python airflow
1个回答
0
投票

如果我理解正确,你尝试将异步函数(协程)作为常规函数运行,这对你不起作用,你需要使用

asyncio.run

在异步事件循环中运行它
import asyncio

@task()
def pdf_to_text_task() -> None:
    ...
    pages = asyncio.run(parse_pdf(pdf_fpath))

© www.soinside.com 2019 - 2024. All rights reserved.