我目前正在开发一个项目,在该项目中,我使用无服务器框架设置了 AWS Lambda 函数,以处理来自 SQS 队列的作业并在 DynamoDB 表中创建条目。部署似乎很成功,日志中没有可疑行为。但是,我遇到了一个问题:SQS 队列仍为空,并且 DynamoDB 表没有显示任何项目。
无服务器.yml
service: challenge1
frameworkVersion: '3'
provider:
name: aws
runtime: python3.8
lambdaHashingVersion: '20201221'
iamRoleStatements:
- Effect: "Allow"
Action: "dynamodb:*"
Resource: "*"
- Effect: "Allow"
Action: "apigateway:*"
Resource: "*"
- Effect: "Allow"
Action: "s3:*"
Resource: "*"
- Effect: "Allow"
Action: "sqs:*"
Resource: "*"
environment:
DYNAMODB_CARDS_TABLE_NAME: challenge1
S3_BUCKETNAME: serverlesschallenge-darla
QUEUE_URL: https://sqs.us-east-1.amazonaws.com/874957933250/serverlesschallenge-darla
functions:
prepareSQSjobS3:
handler: handler.prepare_sqs_job
events:
- s3:
bucket: serverlesschallenge-darla
event: s3:ObjectCreated:Put
existing: true
rules:
- suffix: .csv
prepareSQSjobSQS:
handler: handler.process_sqs_job
events:
- sqs:
arn: "arn:aws:sqs:us-east-1:874957933250:serverlesschallenge-darla"
package:
exclude:
- venv/**
- node_modules/**
resources:
Resources:
LoyaltyCardDynamodbTable:
Type: 'AWS::DynamoDB::Table'
Properties:
AttributeDefinitions:
- AttributeName: card_number
AttributeType: S
- AttributeName: email
AttributeType: S
KeySchema:
- AttributeName: card_number
KeyType: HASH
BillingMode: PAY_PER_REQUEST
TableName: ${self:provider.environment.DYNAMODB_CARDS_TABLE_NAME}
GlobalSecondaryIndexes:
- IndexName: emailIndex
KeySchema:
- AttributeName: email
KeyType: HASH
Projection:
ProjectionType: ALL
plugins:
- serverless-python-requirements
处理程序.py
import json
import string
import random
import os
import boto3
import urllib.parse
import csv
import sys
from io import StringIO
from dynamodb_gateway import DynamodbGateway
s3 = boto3.client('s3')
sqs = boto3.client('sqs')
queue_url = os.getenv('QUEUE_URL')
#aws lambda trigger when theres new s3 file. reads line by line
def prepare_sqs_job(event, context):
try:
print(f"Received S3 event: {json.dumps(event)}")
bucket_name = os.getenv("S3_BUCKETNAME")
# Get the object details from the S3 event
s3_record = event['Records'][0]['s3']
bucket = s3_record['bucket']['name']
file_key = urllib.parse.unquote_plus(s3_record['object']['key'], encoding='utf-8')
# Download the file from S3
response = s3.get_object(Bucket=bucket, Key=file_key)
file_content = response['Body'].read().decode('utf-8')
print(f"Object uploaded: s3://{bucket}/{file_key}")
# Process CSV file and send each row as a message to SQS
rows = [row for i, row in enumerate(csv.reader(StringIO(file_content))) if i > 0]
message_attrs = {'AttributeName': {'StringValue': 'AttributeValue', 'DataType': 'String'}}
for row in rows:
print(row)
sqs.send_message(
QueueUrl=queue_url,
MessageBody=row[0],
MessageAttributes=message_attrs,
)
message = 'Messages accepted!'
print(message)
response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}
except Exception as e:
print(f'Error: {str(e)}')
response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}
return response
def process_sqs_job(event, context):
try:
print(f"Received SQS event: {json.dumps(event)}")
table_name = os.getenv("DYNAMODB_CARDS_TABLE_NAME")
for record in event['Records']:
# Parse JSON content from SQS message
message_body = json.loads(record['body'])
if isinstance(message_body, dict):
# Extract necessary information from the message
card_number = message_body.get('card_number')
first_name = message_body.get('first_name')
last_name = message_body.get('last_name')
email = message_body.get('email')
points = message_body.get('points')
# Check if the email already exists in the DynamoDB table
if email_exists(table_name, email):
print(f"Email {email} already used. Skipping...")
continue
# Create a loyalty card in DynamoDB
loyalty_card = {
"card_number": card_number,
"first_name": first_name,
"last_name": last_name,
"email": email,
"points": points
}
DynamodbGateway.upsert(
table_name=table_name,
mapping_data=[loyalty_card],
primary_keys=["card_number"]
)
print(f"Loyalty card created: {loyalty_card}")
message = 'Messages processed successfully!'
print(message)
response = {"statusCode": 200, "body": json.dumps({"status": "success", "message": message})}
except Exception as e:
print(f'Error: {str(e)}')
response = {"statusCode": 500, "body": json.dumps({"status": "error", "message": str(e)})}
return response
def email_exists(table_name, email):
# Check if the email already exists in the DynamoDB table using GSI
result = DynamodbGateway.query_index_by_partition_key(
index_name="emailIndex",
table_name=table_name,
partition_key_name="email",
partition_key_query_value=email
)
return bool(result)
任何有关解决此问题的见解或建议将不胜感激:) 我不确定为什么 SQS 队列没有收到任何内容,并且尽管 Lambda 函数执行看似成功,但 DynamoDB 表仍然为空。谢谢!
你怎么知道你的 API 调用将文件推送到 sqs 是否成功。您需要在 Lambda 中添加额外的错误处理和日志记录,以确保一切按照您的预期运行。
改变:
sqs.send_message(
QueueUrl=queue_url,
MessageBody=row[0],
MessageAttributes=message_attrs,
)
至
try:
res = sqs.send_message(
QueueUrl=queue_url,
MessageBody=row[0],
MessageAttributes=message_attrs,
)
print(res)
except Exception as e:
print(e)