我有一种情况,需要扇出发送到同一频道的接收器:
func MessagesFromSQS(ctx context.Context, sqsClient sqsiface.SQSAPI) chan *sqs.Message {
messages := make(chan *sqs.Message)
go func() {
defer close(messages)
wg := sync.WaitGroup{}
for i := 0; i < parallelSQSReaders; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
default:
// ...
for _, message := range result.Messages {
messages <- message
}
}
}
}()
}
wg.Wait()
}()
return messages
}
对我来说,这很有意义。但是,竞赛检测器抱怨说,不同的goroutines以及发送和关闭通道。我意识到负责发送的goroutine应该与关闭的goroutine相同,但是正确的方法是什么?
[当您知道将不再有任何写入时,即当所有辅助例程都完成时,请关闭通道。
所以:
wg.Wait()
close(messages)