我是 dagster 的新手,在安排作业时很难针对不同的参数值运行资产。
我使用 dagster 创建了一个管道。
尝试实现上游资产的结果
multiple_num()
并使用op将参数值传递给资产。
简化的例子是
@asset
def get_a():
return a
@asset
def get_b():
return b
@asset
def multiple_num(get_a, get_b):
return c
@op
def get_values():
values = ['a','s','e']
for value in values:
yield RunRequest(
run_key=None,
asset_selection = [AssetKey(multiple_num)]
)
cleanup_directory(value)
def cleanup_directory(): -> str
return status
@job
def run_values():
get_values()
在安排作业时必须调用资产以获得不同的参数值?
如果所有值都应该实现,我会建议使用静态分区的以下解决方案。
@asset
def get_a():
return a
@asset
def get_b():
return b
@asset(
partitions_def=StaticPartitionsDefinition(["a", "s", "e"])
)
def multiple_num(context: AssetExecutionContext, get_a, get_b):
partition_str = context.asset_partition_key_for_output()
return c
@op
def get_values():
yield RunRequest(
asset_selection=[AssetKey(multiple_num)]
)
# clean-up
@job
def run_values():
get_values()