我相对较新,并且在一些并发代码方面遇到了一些问题:
由于某种原因,else 块执行了两次,程序出现恐慌,并显示以下消息:
panic: close of closed channel
,因为 extractedCh
已在第一次关闭。
func (ex Extractor) Extract(pageCh chan *types.ScrapedData, extractedCh chan *types.ScrapedData) {
log.Info().Msg("Extracting data...")
var wg sync.WaitGroup
for {
page, more := <-pageCh
if more {
wg.Add(1)
go func() {
defer wg.Done()
worker, err := ex.getWorker(page)
if err != nil {
log.Error().Err(err).Msg("Error creating worker")
} else {
worker.Extract(extractedCh)
}
}()
} else {
log.Info().Msg("WebPage channel closed: waiting for waitgroup")
wg.Wait()
log.Info().Msg("WaitGroup finished closed -- closing extractedCh")
close(extractedCh)
}
}
}
Extract 函数仅在一个 go 例程中运行 - 所以我不确定为什么 else 块会执行两次并在
extractedCh
已经关闭时尝试关闭它。
解决方法是什么:我是否需要重组我的代码,或者只是在尝试关闭通道之前检查通道是否已关闭?
else 块会第二次执行,因为当
pageCh
关闭时,else 块不会跳出循环。像这样重写代码:
func (ex Extractor) Extract(pageCh chan *types.ScrapedData, extractedCh chan *types.ScrapedData) {
log.Info().Msg("Extracting data...")
var wg sync.WaitGroup
for page := range pageCh {
wg.Add(1)
page := page // delete this line if compiling for Go 1.22 or later
go func() {
defer wg.Done()
worker, err := ex.getWorker(page)
if err != nil {
log.Error().Err(err).Msg("Error creating worker")
} else {
worker.Extract(extractedCh)
}
}()
}
log.Info().Msg("WebPage channel closed: waiting for waitgroup")
wg.Wait()
log.Info().Msg("WaitGroup finished closed -- closing extractedCh")
close(extractedCh)
}
当
pageCh
关闭时,for 循环退出。