我正在使用 Airflow AppFlowOperator 将数据从 salesforce 提取到 AWS S3 存储桶。要使用 salesforce 作为源提取数据,我只需使用:
task_campaign_dump = AppflowRunOperator(
task_id="campaign_dump",
source="salesforce",
dag=dag,
flow_name=flow_name)
它工作正常。但是,当我尝试运行以“SalesForce Market Cloud”作为源的流程时,上述代码不起作用。我认为我应该更改源名称,但 Airflow 文档说只有 salesforce 和 zendesk 被接受作为源参数的值。
有没有办法在 Airflow 中使用 salesforce market 云源运行 AppFlow 流程?
Apache Airflow 中的
AppflowRunOperator
旨在与 Amazon AppFlow 配合使用,其功能受到底层 Amazon AppFlow API 功能的限制。如果 Airflow 文档声明仅接受 Salesforce 和 Zendesk 作为 source
参数的值,则运营商本身可能在这方面受到限制。
但是,您可以考虑一些替代方法:
您可以创建一个使用 AWS 开发工具包 (Boto3) 直接与 Amazon AppFlow 交互的自定义运算符,从而绕过
AppflowRunOperator
的限制。
这是自定义运算符的简化示例:
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import boto3
class CustomAppflowRunOperator(BaseOperator):
@apply_defaults
def __init__(self, flow_name, *args, **kwargs):
super(CustomAppflowRunOperator, self).__init__(*args, **kwargs)
self.flow_name = flow_name
def execute(self, context):
client = boto3.client('appflow')
response = client.create_flow(
flowName=self.flow_name,
# other parameters specific to Salesforce Market Cloud
)
# Handle the response, check for errors, etc.
另一种方法可能是使用
PythonOperator
运行 Python 脚本,该脚本使用 Boto3 与 Amazon AppFlow 交互。
from airflow.operators.python_operator import PythonOperator
import boto3
def run_appflow_flow():
client = boto3.client('appflow')
response = client.create_flow(
flowName='your_flow_name',
# other parameters specific to Salesforce Market Cloud
)
# Handle the response, check for errors, etc.
task_run_flow = PythonOperator(
task_id='run_appflow_flow',
python_callable=run_appflow_flow,
dag=dag,
)
您还可以在
BashOperator
中使用 AWS CLI 来触发流程,尽管这比使用 AWS 开发工具包灵活性较差且更难管理。
from airflow.operators.bash_operator import BashOperator
task_run_flow = BashOperator(
task_id='run_appflow_flow',
bash_command='aws appflow create-flow --flow-name your_flow_name ...',
dag=dag,
)
通过使用这些替代方法之一,您应该能够解决
AppflowRunOperator
的限制并以 Salesforce Market Cloud 作为源运行流程。