我正在使用 AWS SQS 和 SNS fifo 变体来创建一种管理整个系统事件的简单方法。首先,我的系统确实很小而且并不复杂,但我认为如果将代码分成独立的部分并部署为单个二进制文件,那么管理代码本身会容易得多。不是这个问题的主题。
我在这里使用 Localstack 进行本地测试和开发,不确定他们的 SQS/SNS 交互是否不同导致了这个问题,如果是这样,我会将我的问题移至他们的 github 存储库。
所以我的系统中每个实体都有一个 FIFO 主题,我将以项目为例,项目是可部署的网站。所以我有一个名为项目的 FIFO 主题,在消息属性中我设置了事件类型
ProjectCreated
、ProjectRequested
...根据类型,它被路由到适当的 SQS FIFO 队列,例如分析、配置等.
我对消息进行建模的方式如下,
body
内容是jsoncreated-123
,provisioned-123
,deleted-123
。这确保了单个实体/操作组合在任何给定时间都不应该出现重复项这就是我理解的工作方式,基本上,实体消息将按实体本身进行分组和排序(因为我们使用的是 id),并且在上一个操作之前,我们不能为给定的操作提供重复的消息一个被删除。但事实并非如此。
为了展示这一点,我创建了 2 个订阅 FIFO
Project
主题的新队列,一个是 FIFO 队列,另一个是标准队列,我还创建了一个帮助器 bash 脚本来接收消息
#!/bin/bash
# ./receive_sqs_messages.sh [--fifo]
#
# This script connects to a LocalStack instance and continuously receives messages from an SQS queue.
# Accepts --fifo flag to determine whether to use standard queue or fifo queue
# It echoes each message's metadata and content along with a timestamp, and then deletes the message.
# Set default queue name
QUEUE_NAME="test-queue"
# Check for --fifo flag
for arg in "$@"; do
if [ "$arg" == "--fifo" ]; then
QUEUE_NAME="test-queue.fifo"
break
fi
done
# Get the Queue URL
QUEUE_URL=$(awslocal sqs get-queue-url --queue-name $QUEUE_NAME --output text)
echo "$(date): Listening for messages on $QUEUE_URL"
while true; do
# Receive a message from the SQS queue
RESPONSE=$(awslocal sqs receive-message --queue-url $QUEUE_URL --attribute-names All)
# Check if the message is not empty
if [ ! -z "$RESPONSE" ]; then
# Extract message details
MESSAGE_BODY=$(echo $RESPONSE | jq -r '.Messages[0].Body')
RECEIPT_HANDLE=$(echo $RESPONSE | jq -r '.Messages[0].ReceiptHandle')
# Echo the message and its metadata
echo "$(date): Received a message - $MESSAGE_BODY"
echo "Metadata: $RESPONSE"
# Delete the message from the queue
awslocal sqs delete-message --queue-url $QUEUE_URL --receipt-handle $RECEIPT_HANDLE
echo "$(date): Deleted message with ReceiptHandle $RECEIPT_HANDLE"
fi
# Wait for a short period before the next loop iteration
sleep 5
done
让我们看一下日志(请原谅日期的西里尔时间戳)
首先进入 FIFO 队列,它只接收单个事件
пон, 5. феб 2024. 09:04:47 CET: Listening for messages on http://sqs.eu-central-1.localhost.localstack.cloud:4566/000000000
000/test-queue.fifo
пон, 5. феб 2024. 09:05:53 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tRequested"}
Metadata: {
"Messages": [
{
"MessageId": "e5046b81-a8a1-46d1-990d-59b72c79938b",
"ReceiptHandle": "MThkYjBlYTctMzFjZS00NTk3LThmNTQtMjA4NzdiNzBiMDI2IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZS5maWZvIGU1MDQ2YjgxLWE4YTEtNDZkMS05OTBkLTU5YjcyYzc5OTM4YiAxNzA3MTIwMzUzLjcxMzM2ODI=",
"MD5OfBody": "798357615efbbc6fe3a24ac5a8d0c107",
"Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectRequested\"}",
"Attributes": {
"SenderId": "000000000000",
"SentTimestamp": "1707120351448",
"MessageGroupId": "65b151b8-b515-4478-82da-b479f8b8c85c",
"MessageDeduplicationId": "ProjectRequested-65b151b8-b515-4478-82da-b479f8b8c85c",
"SequenceNumber": "14664029547054235655",
"ApproximateReceiveCount": "1",
"ApproximateFirstReceiveTimestamp": "1707120353713"
}
}
]
}
пон, 5. феб 2024. 09:05:54 CET: Deleted message with ReceiptHandle MThkYjBlYTctMzFjZS00NTk3LThmNTQtMjA4NzdiNzBiMDI2IGFybjph
d3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZS5maWZvIGU1MDQ2YjgxLWE4YTEtNDZkMS05OTBkLTU5YjcyYzc5OTM4YiAxNzA3MTIwM
zUzLjcxMzM2ODI=
但是标准队列接收所有这些
пон, 5. феб 2024. 09:04:49 CET: Listening for messages on http://sqs.eu-central-1.localhost.localstack.cloud:4566/000000000
000/test-queue
пон, 5. феб 2024. 09:05:55 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tRequested"}
Metadata: {
"Messages": [
{
"MessageId": "0cfc249e-dde4-4057-b6c7-a57e168cc7af",
"ReceiptHandle": "NGRkZGY5OTUtODQxMC00NWY4LThhM2ItMjlmZjZjNDEzYmY0IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZSAwY2ZjMjQ5ZS1kZGU0LTQwNTctYjZjNy1hNTdlMTY4Y2M3YWYgMTcwNzEyMDM1NS44MDUyMjM3",
"MD5OfBody": "798357615efbbc6fe3a24ac5a8d0c107",
"Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectRequested\"}",
"Attributes": {
"SenderId": "000000000000",
"SentTimestamp": "1707120351443",
"ApproximateReceiveCount": "1",
"ApproximateFirstReceiveTimestamp": "1707120355805"
}
}
]
}
пон, 5. феб 2024. 09:05:56 CET: Deleted message with ReceiptHandle NGRkZGY5OTUtODQxMC00NWY4LThhM2ItMjlmZjZjNDEzYmY0IGFybjph
d3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSAwY2ZjMjQ5ZS1kZGU0LTQwNTctYjZjNy1hNTdlMTY4Y2M3YWYgMTcwNzEyMDM1NS44M
DUyMjM3
пон, 5. феб 2024. 09:06:08 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tGenerated"}
Metadata: {
"Messages": [
{
"MessageId": "5f2ec5a6-909a-4307-9034-ef4176b33446",
"ReceiptHandle": "YTRlNDA5ZTAtODYyOS00MmExLThmZmYtNjY2NWIwY2I1OGJjIGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZSA1ZjJlYzVhNi05MDlhLTQzMDctOTAzNC1lZjQxNzZiMzM0NDYgMTcwNzEyMDM2OC43OTA2MTc3",
"MD5OfBody": "229ce98e015502d0fce2b320e9f330f4",
"Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectGenerated\"}",
"Attributes": {
"SenderId": "000000000000",
"SentTimestamp": "1707120366073",
"ApproximateReceiveCount": "1",
"ApproximateFirstReceiveTimestamp": "1707120368790"
}
}
]
}
пон, 5. феб 2024. 09:06:09 CET: Deleted message with ReceiptHandle YTRlNDA5ZTAtODYyOS00MmExLThmZmYtNjY2NWIwY2I1OGJjIGFybjph
d3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSA1ZjJlYzVhNi05MDlhLTQzMDctOTAzNC1lZjQxNzZiMzM0NDYgMTcwNzEyMDM2OC43O
TA2MTc3
пон, 5. феб 2024. 09:06:15 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tProvisioned"}
Metadata: {
"Messages": [
{
"MessageId": "4930bfb7-c33c-431e-9114-545549574d86",
"ReceiptHandle": "NDM5MTVkNTgtNTAyYS00ODEzLTk0YTYtYTRlNTIxZjJmMjU3IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZSA0OTMwYmZiNy1jMzNjLTQzMWUtOTExNC01NDU1NDk1NzRkODYgMTcwNzEyMDM3NS43NjI3NDM1",
"MD5OfBody": "8d69872b3328e917d7b2a1b59d411ac7",
"Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectProvisioned\"}",
"Attributes": {
"SenderId": "000000000000",
"SentTimestamp": "1707120366483",
"ApproximateReceiveCount": "1",
"ApproximateFirstReceiveTimestamp": "1707120375762"
}
}
]
}
пон, 5. феб 2024. 09:06:16 CET: Deleted message with ReceiptHandle NDM5MTVkNTgtNTAyYS00ODEzLTk0YTYtYTRlNTIxZjJmMjU3IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSA0OTMwYmZiNy1jMzNjLTQzMWUtOTExNC01NDU1NDk1NzRkODYgMTcwNzEyMDM3NS43NjI3NDM1
这会导致我的逻辑出现问题,因为如果同一个项目被请求两次(由于失败或任何其他原因而第二次),则队列不会处理该消息。
组 id 和 dedup id 的逻辑
type EntityMessagePayload struct {
Data map[string]interface{} `json:"data ,omitempty"`
EntityID string `json:"entityId"`
EventType MessageEventType `json:"eventType"`
}
func (p EntityMessagePayload) DefaultDedupID() string {
return fmt.Sprintf("%s-%s", p.EventType, p.EntityID)
}
func (p EntityMessagePayload) DefaultGroup() string {
return p.EntityID
}
设置SQS/SNS的CDK代码
this.projectTopic = new sns.Topic(this, "ProjectTopic", {
fifo: true,
displayName: "Project topic",
topicName: "project.fifo",
});
const subOpts = { rawMessageDelivery: true };
if (isLocal(props.deployEnv)) {
const testQueue = new sqs.Queue(this, "TestQueue", {
queueName: "test-queue",
fifo: false,
})
const testFifoQueue = new sqs.Queue(this, "TestFifoQueue", {
queueName: "test-queue.fifo",
fifo: true,
})
this.projectTopic.addSubscription(new subscriptions.SqsSubscription(testQueue, subOpts));
this.projectTopic.addSubscription(new subscriptions.SqsSubscription(testFifoQueue, subOpts));
}
这应该在最新版本的 LocalStack 中得到修复。消息组逻辑存在问题,最常在长轮询场景中触发。处理完所有消息然后成为另一个接收调用目标的消息组将停止工作。 作为参考,这里是应该解决此问题的 PR 链接: https://github.com/localstack/localstack/pull/10223
免责声明:我为 LocalStack 工作