在气流中设置 sns_publish_operator

问题描述 投票:0回答:1

有人用过 sns_publish_operator 吗?

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/operators/sns/index.html

我对气流还很陌生,在正确设置架构方面遇到了一些问题。

我已经设置了一个简单的 DAG 和数据质量检查任务。基本上,如果数据集未通过数据质量检查,我想发送 SNS 通知。如果它通过了数据质量检查,我希望它不要发送电子邮件。

这个领域的在线帮助似乎没有我想象的那么多。任何资源或一般提示将不胜感激。

amazon-web-services amazon-sns directed-acyclic-graphs airflow
1个回答
3
投票

这个问题有点老了,但也许这仍然对某人有帮助。

首先解决 SnsPublishOperator:您需要设置与 AWS 的 Airflow 连接。有多种方法可以做到这一点。最简单的可能是使用 Web UI。转到“管理”->“连接”->[+](添加新记录)。然后将其设置为“Amazon Webservices Connection”Conn 类型。登录名和密码是 AWS 密钥和密钥秘密。最后,您还必须在“附加”部分中提供您的 SNS 主题所在的区域:

{"region_name": "us-east-1"}

现在,您可以在代码中使用 Operator,同时提供新连接的 conn_id:

my_sns_task = SnsPublishOperator(
    task_id='task_name',
    target_arn='your_sns_topic_arn',
    message='your_message',
    aws_conn_id=conn_id
)

理论上,该运算符也有一个“subject”参数,但当我尝试设置它时,我收到来自组件的错误。

至于将该运算符合并到 DAG 中,一种可能的方法是让评估数据的任务失败(如果检查未通过),并在失败时触发 sns 任务:

my_sns_task = SnsPublishOperator(
    task_id='task_name',
    target_arn='your_sns_topic_arn',
    message='your_message',
    aws_conn_id=conn_id,
    trigger_rule='one_failed'
)

my_sns_task.set_upstream(datacheck_task)
© www.soinside.com 2019 - 2024. All rights reserved.