如何从代码将参数传递给“作业作为任务”?

问题描述 投票:0回答:2

我想使用新的“作业即任务”功能(如在这个SO答案中提到的),但我在将值传递到该作业时遇到问题。

场景

  • 我有一个工作流程作业,其中包含 2 个任务。
  • 工作流程计划每 x 分钟运行一次。
  • Task_A(类型“Notebook”):从表中读取数据,并根据内容决定是否执行Task_B中的工作流程。
  • Task_B(类型“运行作业”):这引用了另一个工作流,该工作流本身包含多个任务,所有任务类型均为“笔记本”。该工作流程需要几个参数。为了简洁起见,我们假设一个参数
    entity_ids
    。如果手动启动,此工作流程自行运行完全正常。

对于这个场景,我想添加一些逻辑来决定将执行多少工作流程:

如果 Task_A 在其表中找到特定信息,它应该启动 Task_B 中的工作流程,并根据该信息为其提供几个参数(在本示例中:

entity_ids
的列表)。如果未找到该信息,工作流程应正常结束并等待下一个间隔。

我的问题: 如何将(多个)值传递到 Task_B 中引用的作业中?

我曾尝试在 Task_A 中使用

dbutils.jobs.taskValues.set("entity_id", "[1, 2]")
进行设置,并在 Task_B 中工作流程的第一个笔记本中使用
dbutils.jobs.taskValues.get("Task_A", "entity_ids", debugValue="[]" )
进行读取,但这会在嵌套作业中引发错误:
Task key does not exist in run: Task_A.

我的猜测是,Task_B 中的嵌套工作流不知道父工作流,并且可能在不同的上下文中运行,因此找不到

taskKey == "Task_A"

为了验证我的假设,我尝试设置一个(仅测试)笔记本,仅使用

entity_ids
函数读取
get()

  • 如果我将其直接作为任务添加到与 Task_A 相同的工作流程中,它就可以工作。
  • 如果我将其放入自己的工作流程中,然后将其引用为“运行作业”任务,则会失败并出现上述错误。

在这两种情况下,它始终是完全相同的笔记本。

apache-spark databricks scheduled-tasks azure-databricks databricks-workflows
2个回答
1
投票

您无法在运行作业类型的后续任务中获取在一项任务中设置的任务值。要获取这些值,您需要使用动态值将它们作为参数传递,并使用笔记本中的以下代码访问它们,该代码位于另一个工作流程作业中。

dbutils.widgets.get("entity_id")

enter image description here

单击浏览动态值了解更多信息。

{{tasks.[task_name].values.[value_name]}}
是获取特定任务和给定键的任务值的动态表达式。

就你而言,是

{{tasks.Task_A.values.entity_id}}

输出:

enter image description here

这里,B_tasks是另一项工作。


0
投票

我尝试了您的方法,并且可以确认任务变量在“运行作业”任务中使用时的行为如您所说。

请检查以下替代方案是否适合您?

  • 在您的

    Task_B
    (输入“运行作业”)中,创建一个名为
    entity_id

    的小部件
  • 在实际工作流程中,在

    Task_A
    之后,添加一个
    if then else
    块并检查任务值。请参阅文档了解更多信息。

    • 为了测试,我尝试使用 {{tasks.Tst_TaskVal_Set.values.entity_id}} != "" ,然后仅在它为空时才触发运行作业。您可以单击“动态值”以获取格式和其他类似值。
  • 真相流后面可以是

    Run Job
    任务,您可以将任务值
    {{tasks.Task_A.values.entity_ids}}
    作为参数传递给小部件。我可以确认这有效。

替代建议:

但是考虑到你的参数数量,看看你是否可以设置一个标志作为任务值,将entity_ids写入DBFS或云中的文件,并在任务值标志上使用条件,然后读取Task_B中的文件 - 也有可能是任务值/小部件可以接受的字符数的限制。

© www.soinside.com 2019 - 2024. All rights reserved.