我正在处理本地气流设置中的一个恼人的错误。
我使用 LocalExecutor 和 MySQL 数据库(如果有帮助的话)。
我的任务看起来像这样:
@task()
def pdf_to_text_task() -> None:
file_storage_instance = get_file_storage_instance_from_task()
pdf_fpath = get_pdf_fpath(file_storage_instance)
pages = parse_pdf(pdf_fpath) # <--- this package breaks Airflow task
pages = re.split(r"(?<!\|)---(?! \|)", pages[0])
text = pdf_to_text(pages)
filename = pdf_fpath.name.split(".")[0]
save_file(json.dumps(text), f"{filename}.json")
当使用这个包时,我收到此错误:
[2024-04-24, 20:20:17 CEST] {task_context_logger.py:104} ERROR - Executor reports task instance <TaskInstance: main.pdf_to_text_task 6f30866d-72b3-449b-8e74-7d4a4b99aaa1 [queued]> finished (failed) although the task says it's queued. (Info: None) Was the task killed externally?
根据
parse_pdf
后面的包的文档,它是一个异步第一个包。 (它只是由一个调用 API 的包组成)。
我非常沮丧,因为没有真正的根本原因可能导致这种情况。
我只是假设上述包导致了这个问题,因为它是唯一的变化。
什么可能导致此错误?
谢谢!
如果我理解正确,你尝试将异步函数(协程)作为常规函数运行,这对你不起作用,你需要使用
asyncio.run
在异步事件循环中运行它
import asyncio
@task()
def pdf_to_text_task() -> None:
...
pages = asyncio.run(parse_pdf(pdf_fpath))