我试图遵循 Go 的“不要通过共享内存来通信,而是通过通信来共享内存”的方式,使用通道来异步地传达要完成的任务,并发送回处理任务的结果。
为了简单起见,我已将通道类型更改为 int,而不是它们真正的结构。并用
time.Sleep()
代替了漫长的处理。
如何在发回所有任务结果后关闭
producedResults
,以便此代码不会卡在最后for
?
quantityOfTasks:= 100
quantityOfWorkers:= 60
remainingTasks := make(chan int)
producedResults := make(chan int)
// produce tasks
go func() {
for i := 0; i < quantityOfTasks; i++ {
remainingTasks <- 1
}
close(remainingTasks)
}()
// produce workers
for i := 0; i < quantityOfWorkers; i++ {
go func() {
for taskSize := range remainingTasks {
// simulate a long task
time.Sleep(time.Second * time.Duration(taskSize))
// return the result of the long task
producedResults <- taskSize
}
}()
}
// read the results of the tasks and agregate them
executedTasks := 0
for resultOfTheTask := range producedResults { //this loop will never finish because producedResults never gets closed
// consolidate the results of the tasks
executedTasks += resultOfTheTask
}
您希望在写入该通道的所有 goroutine 返回后关闭该通道。您可以为此使用 WaitGroup:
wg:=sync.WaitGroup{}
for i := 0; i < quantityOfWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for taskSize := range remainingTasks {
//simulate a long task
time.Sleep(time.Second * time.Duration(taskSize))
//return the result of the long task
producedResults <- taskSize
}
}()
}
go func() {
wg.Wait()
close(producedResults)
}()
等待worker完成后关闭通道。
var wg sync.WaitGroup
wg.Add(quantityOfWorkers)
go func() {
wg.Wait()
close(producedResults)
}()
for i := 0; i < quantityOfWorkers; i++ {
go func() {
defer wg.Done()
...