如何在Airflow中的AppFlow算子中使用salesforce market云源?

问题描述 投票:0回答:1

我正在使用 Airflow AppFlowOperator 将数据从 salesforce 提取到 AWS S3 存储桶。要使用 salesforce 作为源提取数据,我只需使用:

task_campaign_dump = AppflowRunOperator(
    task_id="campaign_dump",
    source="salesforce",
    dag=dag, 
    flow_name=flow_name)

它工作正常。但是,当我尝试运行以“SalesForce Market Cloud”作为源的流程时,上述代码不起作用。我认为我应该更改源名称,但 Airflow 文档说只有 salesforce 和 zendesk 被接受作为源参数的值。

有没有办法在 Airflow 中使用 salesforce market 云源运行 AppFlow 流程?

python amazon-web-services salesforce mwaa amazon-appflow
1个回答
0
投票

Apache Airflow 中的

AppflowRunOperator
旨在与 Amazon AppFlow 配合使用,其功能受到底层 Amazon AppFlow API 功能的限制。如果 Airflow 文档声明仅接受 Salesforce 和 Zendesk 作为
source
参数的值,则运营商本身可能在这方面受到限制。

但是,您可以考虑一些替代方法:

1.自定义运算符

您可以创建一个使用 AWS 开发工具包 (Boto3) 直接与 Amazon AppFlow 交互的自定义运算符,从而绕过

AppflowRunOperator
的限制。

这是自定义运算符的简化示例:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import boto3

class CustomAppflowRunOperator(BaseOperator):

    @apply_defaults
    def __init__(self, flow_name, *args, **kwargs):
        super(CustomAppflowRunOperator, self).__init__(*args, **kwargs)
        self.flow_name = flow_name

    def execute(self, context):
        client = boto3.client('appflow')
        response = client.create_flow(
            flowName=self.flow_name,
            # other parameters specific to Salesforce Market Cloud
        )
        # Handle the response, check for errors, etc.

2. PythonOperator 中的 Python 脚本

另一种方法可能是使用

PythonOperator
运行 Python 脚本,该脚本使用 Boto3 与 Amazon AppFlow 交互。

from airflow.operators.python_operator import PythonOperator
import boto3

def run_appflow_flow():
    client = boto3.client('appflow')
    response = client.create_flow(
        flowName='your_flow_name',
        # other parameters specific to Salesforce Market Cloud
    )
    # Handle the response, check for errors, etc.

task_run_flow = PythonOperator(
    task_id='run_appflow_flow',
    python_callable=run_appflow_flow,
    dag=dag,
)

3. BashOperator 中的 AWS 开发工具包

您还可以在

BashOperator
中使用 AWS CLI 来触发流程,尽管这比使用 AWS 开发工具包灵活性较差且更难管理。

from airflow.operators.bash_operator import BashOperator

task_run_flow = BashOperator(
    task_id='run_appflow_flow',
    bash_command='aws appflow create-flow --flow-name your_flow_name ...',
    dag=dag,
)

通过使用这些替代方法之一,您应该能够解决

AppflowRunOperator
的限制并以 Salesforce Market Cloud 作为源运行流程。

© www.soinside.com 2019 - 2024. All rights reserved.