我创建了 SNS 通知主题,以便每次在 aws codepipeline 中部署新代码时接收代码管道成功或失败的通知。我通过订阅 aws sns 主题收到的消息采用 json 格式,不易阅读。我怎样才能让它更具可读性?
我见过很少的文章说要使用 lambda 来实现这一点。有人可以指导我如何使用 python 解析这些消息
据我了解,您想要订阅 SNS 主题并以美化格式接收通知。 简单来说,您希望改变通过 SNS 收到的事件。 AWS Lambda 在这些情况下可以派上用场。 假设您知道如何使用 SNS 主题作为 lambda 的触发器,这就是您的 lambda 处理程序的外观,
def handler(event, context):
for record in event['Records']:
parse_notification_here
有关如何使用 SNS 作为 AWS Lambda 触发器的详细信息 - https://docs.aws.amazon.com/lambda/latest/dg/with-sns.html
只是给你一个想法,因为我不完全确定你的情况。
您可以根据代码管道是否成功发布SNS主题的消息,以使其更易于阅读。
sns = boto3.client('sns')
response = sns.publish(
TopicArn='arn:aws:sns:region:account-id:topic-name',
Message='failure' # or succeeded
)
如何使其更具可读性?如您所知,您从订阅的 SNS 主题收到的消息基本上都是 JSON 格式,根据 docs,它通常如下所示:
{
"Records": [
{
"EventVersion": "1.0",
"EventSubscriptionArn": "arn:aws:sns:us-east-1:123456789012:sns-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
"EventSource": "aws:sns",
"Sns": {
"SignatureVersion": "1",
"Timestamp": "2019-01-02T12:45:07.000Z",
"Signature": "tcc6faL2yUC6dgZdmrwh1Y4cGa/ebXEkAi6RibDsvpi+tE/1+82j...65r==",
"SigningCertUrl": "https://sns.us-east-1.amazonaws.com/SimpleNotificationService-ac565b8b1a6c5d002d285f9598aa1d9b.pem",
"MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e",
"Message": "Hello from SNS!",
"MessageAttributes": {
"Test": {
"Type": "String",
"Value": "TestString"
},
"TestBinary": {
"Type": "Binary",
"Value": "TestBinary"
}
},
"Type": "Notification",
"UnsubscribeUrl": "https://sns.us-east-1.amazonaws.com/?Action=Unsubscribe&SubscriptionArn=arn:aws:sns:us-east-1:123456789012:test-lambda:21be56ed-a058-49f5-8c98-aedd2564c486",
"TopicArn":"arn:aws:sns:us-east-1:123456789012:sns-lambda",
"Subject": "TestInvoke"
}
}
]
}
那么,如何使用 AWS Lambda 函数(python)解析这些消息?这是给您的示例代码。
import boto3
def lambda_handler(event, context):
# extract the SNS message from the Lambda event
# get event version
event_version = event['Records'][0]['EventVersion']
print(f"event_version: {event_version}")
# get sns message
sns_message = event['Records'][0]['Sns']['Message']
print(f"sns_message: {sns_message}")
# get sns timestamp
sns_timestamp = event['Records'][0]['Sns']['Timestamp']
print(f"sns_timestamp: {sns_stamp}")