连接到 FIFO Topic 的 FIFO 队列不会接收后续消息

问题描述 投票:0回答:1

我正在使用 AWS SQS 和 SNS fifo 变体来创建一种管理整个系统事件的简单方法。首先,我的系统确实很小而且并不复杂,但我认为如果将代码分成独立的部分并部署为单个二进制文件,那么管理代码本身会容易得多。不是这个问题的主题。

我在这里使用 Localstack 进行本地测试和开发,不确定他们的 SQS/SNS 交互是否不同导致了这个问题,如果是这样,我会将我的问题移至他们的 github 存储库。

所以我的系统中每个实体都有一个 FIFO 主题,我将以项目为例,项目是可部署的网站。所以我有一个名为项目的 FIFO 主题,在消息属性中我设置了事件类型

ProjectCreated
ProjectRequested
...根据类型,它被路由到适当的 SQS FIFO 队列,例如分析、配置等.

我对消息进行建模的方式如下,

  • body
    内容是json
  • 消息组 id 是实体 id,因此有关单个实体的所有消息都正确排序
  • message dedup id 是实体 id + 操作,例如。
    created-123
    provisioned-123
    deleted-123
    。这确保了单个实体/操作组合在任何给定时间都不应该出现重复项

这就是我理解的工作方式,基本上,实体消息将按实体本身进行分组和排序(因为我们使用的是 id),并且在上一个操作之前,我们不能为给定的操作提供重复的消息一个被删除。但事实并非如此。

为了展示这一点,我创建了 2 个订阅 FIFO

Project
主题的新队列,一个是 FIFO 队列,另一个是标准队列,我还创建了一个帮助器 bash 脚本来接收消息

#!/bin/bash
# ./receive_sqs_messages.sh [--fifo]
#
# This script connects to a LocalStack instance and continuously receives messages from an SQS queue.
# Accepts --fifo flag to determine whether to use standard queue or fifo queue
# It echoes each message's metadata and content along with a timestamp, and then deletes the message.

# Set default queue name
QUEUE_NAME="test-queue"

# Check for --fifo flag
for arg in "$@"; do
    if [ "$arg" == "--fifo" ]; then
        QUEUE_NAME="test-queue.fifo"
        break
    fi
done

# Get the Queue URL
QUEUE_URL=$(awslocal sqs get-queue-url --queue-name $QUEUE_NAME --output text)

echo "$(date): Listening for messages on $QUEUE_URL"

while true; do
    # Receive a message from the SQS queue
    RESPONSE=$(awslocal sqs receive-message --queue-url $QUEUE_URL --attribute-names All)

    # Check if the message is not empty
    if [ ! -z "$RESPONSE" ]; then
        # Extract message details
        MESSAGE_BODY=$(echo $RESPONSE | jq -r '.Messages[0].Body')
        RECEIPT_HANDLE=$(echo $RESPONSE | jq -r '.Messages[0].ReceiptHandle')

        # Echo the message and its metadata
        echo "$(date): Received a message - $MESSAGE_BODY"
        echo "Metadata: $RESPONSE"

        # Delete the message from the queue
        awslocal sqs delete-message --queue-url $QUEUE_URL --receipt-handle $RECEIPT_HANDLE
        echo "$(date): Deleted message with ReceiptHandle $RECEIPT_HANDLE"
    fi

    # Wait for a short period before the next loop iteration
    sleep 5
done

让我们看一下日志(请原谅日期的西里尔时间戳)

首先进入 FIFO 队列,它只接收单个事件

пон,  5. феб 2024.  09:04:47 CET: Listening for messages on http://sqs.eu-central-1.localhost.localstack.cloud:4566/000000000
000/test-queue.fifo
пон,  5. феб 2024.  09:05:53 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tRequested"}
Metadata: {
    "Messages": [
        {
            "MessageId": "e5046b81-a8a1-46d1-990d-59b72c79938b",
            "ReceiptHandle": "MThkYjBlYTctMzFjZS00NTk3LThmNTQtMjA4NzdiNzBiMDI2IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZS5maWZvIGU1MDQ2YjgxLWE4YTEtNDZkMS05OTBkLTU5YjcyYzc5OTM4YiAxNzA3MTIwMzUzLjcxMzM2ODI=",
            "MD5OfBody": "798357615efbbc6fe3a24ac5a8d0c107",
            "Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectRequested\"}",
            "Attributes": {
                "SenderId": "000000000000",
                "SentTimestamp": "1707120351448",
                "MessageGroupId": "65b151b8-b515-4478-82da-b479f8b8c85c",
                "MessageDeduplicationId": "ProjectRequested-65b151b8-b515-4478-82da-b479f8b8c85c",
                "SequenceNumber": "14664029547054235655",
                "ApproximateReceiveCount": "1",
                "ApproximateFirstReceiveTimestamp": "1707120353713"
            }
        }
    ]
}
пон,  5. феб 2024.  09:05:54 CET: Deleted message with ReceiptHandle MThkYjBlYTctMzFjZS00NTk3LThmNTQtMjA4NzdiNzBiMDI2IGFybjph
d3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZS5maWZvIGU1MDQ2YjgxLWE4YTEtNDZkMS05OTBkLTU5YjcyYzc5OTM4YiAxNzA3MTIwM
zUzLjcxMzM2ODI=

但是标准队列接收所有这些

пон,  5. феб 2024.  09:04:49 CET: Listening for messages on http://sqs.eu-central-1.localhost.localstack.cloud:4566/000000000
000/test-queue
пон,  5. феб 2024.  09:05:55 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tRequested"}
Metadata: {
    "Messages": [
        {
            "MessageId": "0cfc249e-dde4-4057-b6c7-a57e168cc7af",
            "ReceiptHandle": "NGRkZGY5OTUtODQxMC00NWY4LThhM2ItMjlmZjZjNDEzYmY0IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZSAwY2ZjMjQ5ZS1kZGU0LTQwNTctYjZjNy1hNTdlMTY4Y2M3YWYgMTcwNzEyMDM1NS44MDUyMjM3",
            "MD5OfBody": "798357615efbbc6fe3a24ac5a8d0c107",
            "Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectRequested\"}",
            "Attributes": {
                "SenderId": "000000000000",
                "SentTimestamp": "1707120351443",
                "ApproximateReceiveCount": "1",
                "ApproximateFirstReceiveTimestamp": "1707120355805"
            }
        }
    ]
}
пон,  5. феб 2024.  09:05:56 CET: Deleted message with ReceiptHandle NGRkZGY5OTUtODQxMC00NWY4LThhM2ItMjlmZjZjNDEzYmY0IGFybjph
d3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSAwY2ZjMjQ5ZS1kZGU0LTQwNTctYjZjNy1hNTdlMTY4Y2M3YWYgMTcwNzEyMDM1NS44M
DUyMjM3
пон,  5. феб 2024.  09:06:08 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tGenerated"}
Metadata: {
    "Messages": [
        {
            "MessageId": "5f2ec5a6-909a-4307-9034-ef4176b33446",
            "ReceiptHandle": "YTRlNDA5ZTAtODYyOS00MmExLThmZmYtNjY2NWIwY2I1OGJjIGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZSA1ZjJlYzVhNi05MDlhLTQzMDctOTAzNC1lZjQxNzZiMzM0NDYgMTcwNzEyMDM2OC43OTA2MTc3",
            "MD5OfBody": "229ce98e015502d0fce2b320e9f330f4",
            "Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectGenerated\"}",
            "Attributes": {
                "SenderId": "000000000000",
                "SentTimestamp": "1707120366073",
                "ApproximateReceiveCount": "1",
                "ApproximateFirstReceiveTimestamp": "1707120368790"
            }
        }
    ]
}
пон,  5. феб 2024.  09:06:09 CET: Deleted message with ReceiptHandle YTRlNDA5ZTAtODYyOS00MmExLThmZmYtNjY2NWIwY2I1OGJjIGFybjph
d3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSA1ZjJlYzVhNi05MDlhLTQzMDctOTAzNC1lZjQxNzZiMzM0NDYgMTcwNzEyMDM2OC43O
TA2MTc3
пон,  5. феб 2024.  09:06:15 CET: Received a message - {"entityId":"65b151b8-b515-4478-82da-b479f8b8c85c","eventType":"Projec
tProvisioned"}
Metadata: {
    "Messages": [
        {
            "MessageId": "4930bfb7-c33c-431e-9114-545549574d86",
            "ReceiptHandle": "NDM5MTVkNTgtNTAyYS00ODEzLTk0YTYtYTRlNTIxZjJmMjU3IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDA
wMDA6dGVzdC1xdWV1ZSA0OTMwYmZiNy1jMzNjLTQzMWUtOTExNC01NDU1NDk1NzRkODYgMTcwNzEyMDM3NS43NjI3NDM1",
            "MD5OfBody": "8d69872b3328e917d7b2a1b59d411ac7",
            "Body": "{\"entityId\":\"65b151b8-b515-4478-82da-b479f8b8c85c\",\"eventType\":\"ProjectProvisioned\"}",
            "Attributes": {
                "SenderId": "000000000000",
                "SentTimestamp": "1707120366483",
                "ApproximateReceiveCount": "1",
                "ApproximateFirstReceiveTimestamp": "1707120375762"
            }
        }
    ]
}
пон,  5. феб 2024.  09:06:16 CET: Deleted message with ReceiptHandle NDM5MTVkNTgtNTAyYS00ODEzLTk0YTYtYTRlNTIxZjJmMjU3IGFybjphd3M6c3FzOmV1LWNlbnRyYWwtMTowMDAwMDAwMDAwMDA6dGVzdC1xdWV1ZSA0OTMwYmZiNy1jMzNjLTQzMWUtOTExNC01NDU1NDk1NzRkODYgMTcwNzEyMDM3NS43NjI3NDM1

这会导致我的逻辑出现问题,因为如果同一个项目被请求两次(由于失败或任何其他原因而第二次),则队列不会处理该消息。

组 id 和 dedup id 的逻辑

type EntityMessagePayload struct {
    Data      map[string]interface{} `json:"data ,omitempty"`
    EntityID  string                 `json:"entityId"`
    EventType MessageEventType       `json:"eventType"`
}

func (p EntityMessagePayload) DefaultDedupID() string {
    return fmt.Sprintf("%s-%s", p.EventType, p.EntityID)
}

func (p EntityMessagePayload) DefaultGroup() string {
    return p.EntityID
}

设置SQS/SNS的CDK代码

        this.projectTopic = new sns.Topic(this, "ProjectTopic", {
            fifo: true,
            displayName: "Project topic",
            topicName: "project.fifo",
        });

        const subOpts = { rawMessageDelivery: true };


        if (isLocal(props.deployEnv)) {
            const testQueue = new sqs.Queue(this, "TestQueue", {
                queueName: "test-queue",
                fifo: false,
            })

            const testFifoQueue = new sqs.Queue(this, "TestFifoQueue", {
                queueName: "test-queue.fifo",
                fifo: true,
            })

            this.projectTopic.addSubscription(new subscriptions.SqsSubscription(testQueue, subOpts));
            this.projectTopic.addSubscription(new subscriptions.SqsSubscription(testFifoQueue, subOpts));
        }
amazon-web-services amazon-sqs amazon-sns localstack
1个回答
0
投票

这应该在最新版本的 LocalStack 中得到修复。消息组逻辑存在问题,最常在长轮询场景中触发。处理完所有消息然后成为另一个接收调用目标的消息组将停止工作。 作为参考,这里是应该解决此问题的 PR 链接: https://github.com/localstack/localstack/pull/10223

免责声明:我为 LocalStack 工作

© www.soinside.com 2019 - 2024. All rights reserved.