使用无服务器框架的 AWS Lambda 函数:SQS 队列为空,DynamoDB 表未更新

问题描述 投票:0回答:1

我目前正在开发一个项目,在该项目中,我使用无服务器框架设置了 AWS Lambda 函数,以处理来自 SQS 队列的作业并在 DynamoDB 表中创建条目。部署似乎很成功,日志中没有可疑行为。但是,我遇到了一个问题:SQS 队列仍为空,并且 DynamoDB 表没有显示任何项目。

This is what I'm trying to do. I'm done with L4 but I'm stuck in L5.

无服务器.yml

service: challenge1
frameworkVersion: '3'

provider:
  name: aws
  runtime: python3.8
  lambdaHashingVersion: '20201221'
  iamRoleStatements:
    - Effect: "Allow"
      Action: "dynamodb:*"
      Resource: "*"
    - Effect: "Allow"
      Action: "apigateway:*"
      Resource: "*"
    - Effect: "Allow"
      Action: "s3:*"
      Resource: "*"
    - Effect: "Allow"
      Action: "sqs:*"
      Resource: "*"
  environment:
    DYNAMODB_CARDS_TABLE_NAME: challenge1 
    S3_BUCKETNAME: serverlesschallenge-darla
    QUEUE_URL: https://sqs.us-east-1.amazonaws.com/874957933250/serverlesschallenge-darla

functions:
  prepareSQSjobS3:
    handler: handler.prepare_sqs_job
    events:
      - s3:
          bucket: serverlesschallenge-darla
          event: s3:ObjectCreated:Put
          existing: true
          rules:
            - suffix: .csv
  prepareSQSjobSQS:
    handler: handler.process_sqs_job
    events:
      - sqs:
          arn: "arn:aws:sqs:us-east-1:874957933250:serverlesschallenge-darla"

package:
  exclude:
    - venv/**
    - node_modules/**

resources:
  Resources:
    LoyaltyCardDynamodbTable:
      Type: 'AWS::DynamoDB::Table'
      Properties:
        AttributeDefinitions:
          - AttributeName: card_number
            AttributeType: S
          - AttributeName: email
            AttributeType: S
        KeySchema:
          - AttributeName: card_number
            KeyType: HASH
        BillingMode: PAY_PER_REQUEST
        TableName: ${self:provider.environment.DYNAMODB_CARDS_TABLE_NAME}
        GlobalSecondaryIndexes:
            - IndexName: emailIndex
              KeySchema:
                - AttributeName: email
                  KeyType: HASH
              Projection:
                ProjectionType: ALL

plugins:
  - serverless-python-requirements

处理程序.py

import json
import string
import random
import os
import boto3
import urllib.parse
import csv
import sys
from io import StringIO

from dynamodb_gateway import DynamodbGateway

s3 = boto3.client('s3')
sqs = boto3.client('sqs')
queue_url = os.getenv('QUEUE_URL')

#aws lambda trigger when theres new s3 file. reads line by line
def prepare_sqs_job(event, context):
    try:
        print(f"Received S3 event: {json.dumps(event)}")

        bucket_name = os.getenv("S3_BUCKETNAME")

        # Get the object details from the S3 event
        s3_record = event['Records'][0]['s3']
        bucket = s3_record['bucket']['name']
        file_key = urllib.parse.unquote_plus(s3_record['object']['key'], encoding='utf-8')

        # Download the file from S3
        response = s3.get_object(Bucket=bucket, Key=file_key)
        file_content = response['Body'].read().decode('utf-8')
        print(f"Object uploaded: s3://{bucket}/{file_key}")

        # Process CSV file and send each row as a message to SQS
        rows = [row for i, row in enumerate(csv.reader(StringIO(file_content))) if i > 0]

        message_attrs = {'AttributeName': {'StringValue': 'AttributeValue', 'DataType': 'String'}}
        for row in rows:
            print(row)
            sqs.send_message(
                QueueUrl=queue_url,
                MessageBody=row[0],
                MessageAttributes=message_attrs,
            )

        message = 'Messages accepted!'
        print(message)
        response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}

    except Exception as e:
        print(f'Error: {str(e)}')
        response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}

    return response

def process_sqs_job(event, context):
    try:
        print(f"Received SQS event: {json.dumps(event)}")

        table_name = os.getenv("DYNAMODB_CARDS_TABLE_NAME")

        for record in event['Records']:
            # Parse JSON content from SQS message
            message_body = json.loads(record['body'])

            if isinstance(message_body, dict):
                # Extract necessary information from the message
                card_number = message_body.get('card_number')
                first_name = message_body.get('first_name')
                last_name = message_body.get('last_name')
                email = message_body.get('email')
                points = message_body.get('points')

                # Check if the email already exists in the DynamoDB table
                if email_exists(table_name, email):
                    print(f"Email {email} already used. Skipping...")
                    continue

                # Create a loyalty card in DynamoDB
                loyalty_card = {
                    "card_number": card_number,
                    "first_name": first_name,
                    "last_name": last_name,
                    "email": email,
                    "points": points
                }

                DynamodbGateway.upsert(
                    table_name=table_name,
                    mapping_data=[loyalty_card],
                    primary_keys=["card_number"]
                )

                print(f"Loyalty card created: {loyalty_card}")

        message = 'Messages processed successfully!'
        print(message)
        response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}

    except Exception as e:
        print(f'Error: {str(e)}')
        response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}

    return response


def email_exists(table_name, email):
    # Check if the email already exists in the DynamoDB table using GSI
    result = DynamodbGateway.query_index_by_partition_key(
        index_name="emailIndex",
        table_name=table_name,
        partition_key_name="email",
        partition_key_query_value=email
    )

    return bool(result)

任何有关解决此问题的见解或建议将不胜感激:) 我不确定为什么 SQS 队列没有收到任何内容,并且尽管 Lambda 函数执行看似成功,但 DynamoDB 表仍然为空。谢谢!

amazon-web-services amazon-s3 aws-lambda amazon-dynamodb amazon-sqs
1个回答
0
投票

你怎么知道你的 API 调用将文件推送到 sqs 是否成功。您需要在 Lambda 中添加额外的错误处理和日志记录,以确保一切按照您的预期运行。

改变:

sqs.send_message(
                QueueUrl=queue_url,
                MessageBody=row[0],
                MessageAttributes=message_attrs,
            )

try:
    res = sqs.send_message(
                QueueUrl=queue_url,
                MessageBody=row[0],
                MessageAttributes=message_attrs,
            )
    print(res)
except Exception as e:
    print(e)
© www.soinside.com 2019 - 2024. All rights reserved.