为什么 goroutine 挂起?

问题描述 投票:0回答:0

我正在尝试轮询 SQS 队列,并在将消息正文解组到 json 事件负载后仅获取满足特定条件的消息。我发现在一个测试中调用

GetMessages
然后调用
pollQueue
时,输出类似于

Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Length of messages 1Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Length of messages 1Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Length of messages 1Message number 1 sent

当该方法在测试运行过程中多次调用时,它就会挂起。以上可以请您帮忙吗?

func GetMessages[T any](ctx context.Context, awsconfig aws.Config, queueName string, maxNumberOfMessages int32,
    filter func(t T) bool) ([]T, error) {
    var wg sync.WaitGroup

    ch := make(chan *types.Message, maxNumberOfMessages)

    wg.Add(1)
    go func() {
        defer wg.Done()
        pollQueue(ctx, ch, awsconfig, queueName, maxNumberOfMessages)
        wg.Wait()
    }()
    

    var payload T
    var messages []T

    for message := range ch {
        err := json.Unmarshal([]byte(*message.Body), &payload)
        if err != nil {
            logging.WithError(err).Errorf("Failed to parse JSON for message %s", *message.MessageId)
        }
        if filter(payload) {
            messages = append(messages, payload)
            if len(messages) == int(maxNumberOfMessages) {
                break
            }
        }
    }

    fmt.Printf("Length of messages %d", len(messages))
    return messages, nil
}

func pollQueue(ctx context.Context, ch chan<- *types.Message, awsconfig aws.Config, queueName string, maxNumberOfMessages int32) {
    sqsClient := sqs.NewFromConfig(awsconfig)
    queueURLOut, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
        QueueName: aws.String(queueName),
    })
    if err != nil {
        _ = fmt.Errorf("could not get queue, error is : %w", err)
    }

    for {
        msgs, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
            QueueUrl:            queueURLOut.QueueUrl,
            MaxNumberOfMessages: maxNumberOfMessages,
            WaitTimeSeconds:     1,
        })
        if err != nil {
            _ = fmt.Errorf("failed to fetch messages, error is: %w", err)
        }

        for i, message := range msgs.Messages {
            ch <- &message
            fmt.Printf("Message number %d sent\n", i+1)
        }
    }
}
go amazon-sqs channel goroutine
© www.soinside.com 2019 - 2024. All rights reserved.