我正在尝试轮询 SQS 队列,并在将消息正文解组到 json 事件负载后仅获取满足特定条件的消息。我发现在一个测试中调用
GetMessages
然后调用 pollQueue
时,输出类似于
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Length of messages 1Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Length of messages 1Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Message number 1 sent
Length of messages 1Message number 1 sent
当该方法在测试运行过程中多次调用时,它就会挂起。以上可以请您帮忙吗?
func GetMessages[T any](ctx context.Context, awsconfig aws.Config, queueName string, maxNumberOfMessages int32,
filter func(t T) bool) ([]T, error) {
var wg sync.WaitGroup
ch := make(chan *types.Message, maxNumberOfMessages)
wg.Add(1)
go func() {
defer wg.Done()
pollQueue(ctx, ch, awsconfig, queueName, maxNumberOfMessages)
wg.Wait()
}()
var payload T
var messages []T
for message := range ch {
err := json.Unmarshal([]byte(*message.Body), &payload)
if err != nil {
logging.WithError(err).Errorf("Failed to parse JSON for message %s", *message.MessageId)
}
if filter(payload) {
messages = append(messages, payload)
if len(messages) == int(maxNumberOfMessages) {
break
}
}
}
fmt.Printf("Length of messages %d", len(messages))
return messages, nil
}
func pollQueue(ctx context.Context, ch chan<- *types.Message, awsconfig aws.Config, queueName string, maxNumberOfMessages int32) {
sqsClient := sqs.NewFromConfig(awsconfig)
queueURLOut, err := sqsClient.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
_ = fmt.Errorf("could not get queue, error is : %w", err)
}
for {
msgs, err := sqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
QueueUrl: queueURLOut.QueueUrl,
MaxNumberOfMessages: maxNumberOfMessages,
WaitTimeSeconds: 1,
})
if err != nil {
_ = fmt.Errorf("failed to fetch messages, error is: %w", err)
}
for i, message := range msgs.Messages {
ch <- &message
fmt.Printf("Message number %d sent\n", i+1)
}
}
}