我想使用Python将活动添加到Azure数据工厂中的管道。使用以下代码,我替换了实际的活动,但没有添加新的活动:
p_name = 'test'
act_name = 'Wait4'
Wait_activity = WaitActivity(name=act_name,wait_time_in_seconds=5)
p_obj = PipelineResource(activities=[Wait_activity])
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
运行代码后:
期望:
研究了源代码中的语句:
因此,当您更新管道时,activities
属性应该是管道中的活动列表,而不是单个活动。
例如:
wait_activity = WaitActivity(name="waittest", type="Wait", wait_time_in_seconds=100, )
ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]
wait_activity1 = WaitActivity(name="waittest1", type="Wait", wait_time_in_seconds=100,depends_on=ActivityDependency)
p_name = 'testforadf'
p_obj = PipelineResource(
activities=[wait_activity, wait_activity1])
p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, p_obj)
请注意两行:
activities=[wait_activity, wait_activity1])
此属性应包含您的所有活动。
ActivityDependency = [{"activity":"waittest","dependencyConditions":["Succeeded"]}]
这是您的活动之间的依赖条件。
我的输出:
任何问题,请让我知道。
好,请查看我的示例代码:
前提是我已经有以上两个等待活动
adftest = adf_client.pipelines.get(rg_name,df_name,p_name)
print(adftest)
for activity in adftest.activities :
print(activity.name)
print(activity.type)
然后输出是:
{'additional_properties': None, 'id': '/subscriptions/b83c1ed3-c5b6-44fb-b5ba-2b83a074c23f/resourceGroups/v-jugong-ChinaCXPTeam/providers/Microsoft.DataFactory/factories/jaygongadf/pipelines/testforadf', 'name': 'testforadf', 'type': 'Microsoft.DataFactory/factories/pipelines', 'etag': 'ed006cf3-0000-0800-0000-5da970600000', 'description': None, 'activities': [<azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0>, <azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0>], 'parameters': None, 'variables': None, 'concurrency': None, 'annotations': None, 'folder': None}
waittest
Wait
waittest1
Wait
然后您可以在activities
属性上方看到对象。此外,您可以看到它们的类型:'activities': [<azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FEDE0F0>, <azure.mgmt.datafactory.models.wait_activity_py3.WaitActivity object at 0x000001C05FED6DA0>]
它们是WaitActivity类型,因此您可以使用:]查看它们的循环活动以获取其中的每个项目。
for activity in adftest.activities : print(activity.name) print(activity.type)
您可以查看WaitActivity类型包含哪些属性,例如源代码语句中的
name
,type
。
然后,如果您想再添加一个活动,例如,再添加一个WaitActivity:
wait_activity2 = WaitActivity(name="waittest2", type="Wait", wait_time_in_seconds=100, ) adftest.activities.append(wait_activity2) p = adf_client.pipelines.create_or_update(rg_name, df_name, p_name, adftest)
[请参见上面的代码,我创建了一个名为
wait_activity2
的新WaitActivity,然后将其附加到activities
数组中。然后像往常一样更新管道,您将找到新的活动: