如何从 dagster 中的 op 向 asset 传递参数

问题描述 投票:0回答:1

我是 dagster 的新手,在安排作业时很难针对不同的参数值运行资产。

我使用 dagster 创建了一个管道。

尝试实现上游资产的结果

multiple_num()
并使用op将参数值传递给资产。

简化的例子是

@asset
def get_a():
    return a
    
@asset 
def get_b():
    return b
    
@asset
def multiple_num(get_a, get_b):
    return c
    
@op
def get_values():
    values = ['a','s','e']
    for value in values:
        yield RunRequest(
            run_key=None,
            asset_selection = [AssetKey(multiple_num)]
        )
        cleanup_directory(value)

def cleanup_directory(): -> str
    return status
    
@job
def run_values():
   get_values()

在安排作业时必须调用资产以获得不同的参数值?

python-3.x pipeline orchestration dagster
1个回答
0
投票

如果所有值都应该实现,我会建议使用静态分区的以下解决方案。

@asset
def get_a():
    return a


@asset
def get_b():
    return b


@asset(
    partitions_def=StaticPartitionsDefinition(["a", "s", "e"])
)
def multiple_num(context: AssetExecutionContext, get_a, get_b):
    partition_str = context.asset_partition_key_for_output()
    return c


@op
def get_values():
    yield RunRequest(
        asset_selection=[AssetKey(multiple_num)]
    )
    # clean-up


@job
def run_values():
    get_values()
© www.soinside.com 2019 - 2024. All rights reserved.