TL; DR: all goroutines are asleep, deadlock!
的典型情况,但无法弄清楚
我正在解析Wiktionary XML转储以构建单词数据库。我将每篇文章的文本解析推迟到一个goroutine,希望它可以加快处理速度。它是7GB,并且在串行方式下在我的计算机中处理不到2分钟,但是如果我可以利用所有内核,为什么不这样做。
我通常不是线程技术的新手,出现all goroutines are asleep, deadlock!
错误。怎么了
这可能根本不执行,因为它使用了无缓冲的通道,所以所有goroutine最终都有效地串行执行,但是我的想法是学习和理解线程并使用不同的替代方法来测试它需要花费多长时间:
runtime.NumCPU()
的所有goroutines我的伪代码摘要:
while tag := xml.getNextTag() {
wg.Add(1)
go parseTagText(chan, wg, tag.text)
// consume a channel message if available
select {
case msg := <-chan:
// do something with msg
default:
}
}
// reading tags finished, wait for running goroutines, consume what's left on the channel
for msg := range chan {
// do something with msg
}
// Sometimes this point is never reached, I get a deadlock
wg.Wait()
----
func parseTagText(chan, wg, tag.text) {
defer wg.Done()
// parse tag.text
chan <- whatever // just inform that the text has been parsed
}
在Go Playground的完整示例中,您:
创建一个通道(第39行,results := make(chan langs)
)和一个等待组(第40行,var wait sync.WaitGroup
)。到目前为止一切顺利。
循环:在循环中,有时会剥离任务:
if ...various conditions... {
wait.Add(1)
go parseTerm(results, &wait, text)
}
在循环中,有时会从通道进行非阻塞读取(如您的问题所示)。这里也没问题。但是...
循环结束时,使用:
for res := range results {
...
}
without在所有作者完成后,始终只在一个位置调用close(results)
。此循环使用从通道读取的blocking。只要某些编写器goroutine仍在运行,阻塞读取就可以在不停止整个系统的情况下进行阻塞,但是当最后一个编写器完成写入并退出时,将没有剩余的编写器goroutine。剩下的任何[[other例行程序都可能救您,但没有一个。
var wait
(在编写器的正确位置添加1,然后在编写器的正确位置调用Done()
),解决方案是再添加一个goroutine,这将是抢救您的方法: go func() {
wait.Wait()
close(results)
}()
您应该在进入goroutine。然后,此goroutine将调用for res := range results
循环之前,先剥离此辅助程序goroutine。 (如果您更早地将其分拆,则可能很快看到wait
变量递减为零,就在通过拆分另一个parseTerm
将其再次计数之前。)此匿名函数将阻塞
wait
变量的Wait()
函数,直到最后一个编写程序goroutine调用了最后的wait.Done()
,这将取消阻塞this
close(results)
,这将安排for
goroutine中的main
循环完成,从而解除对该goroutine的阻塞。当此goroutine(救援人员)返回并终止时,不再有救援人员,但我们不再需要任何救援人员。(此主代码随后会不必要地调用wait.Wait()
:由于for
不会终止,直到new
例程中的wait.Wait()
已被解除阻止,所以我们知道下一个wait.Wait()
将立即返回。因此,我们可以挂断第二个电话,尽管不打扰。)// This is our producer
func foo(i int, ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
ch <- i
fmt.Println(i, "done")
}
// This is our consumer - it uses a different WG to signal it's done
func consumeData(ch chan int, wg *sync.WaitGroup) {
defer wg.Done()
for x := range ch {
fmt.Println(x)
}
fmt.Println("ALL DONE")
}
func main() {
ch := make(chan int)
wg := sync.WaitGroup{}
// create the producers
for i := 0; i < 10; i++ {
wg.Add(1)
go foo(i, ch, &wg)
}
// create the consumer on a different goroutine, and sync using another WG
consumeWg := sync.WaitGroup{}
consumeWg.Add(1)
go consumeData(ch,&consumeWg)
wg.Wait() // <<<< means that the producers are done
close(ch) // << Signal the consumer to exit
consumeWg.Wait() // << Wait for the consumer to exit
}