我试图通过编写一小段代码来理解 Go 中的
chan chan
结构,如下所示,我希望 3 个工作子例程处理 10 个作业。每个工作子例程都有自己的通道,在其中接收要处理的“作业”。主 Go 例程通过从通道池中获取通道(因此有 chan chan
构造)来将作业分配到工作人员的通道。
但是这段代码会导致死锁情况!我尝试了此代码的一些变体,但遇到了相同的错误。
是因为工作子例程永远等待从其通道读取作业吗?或者是由于其他原因(也许通道过早关闭等)?我对整个结构的理解显然遗漏了一些东西。
有人可以帮我理解这个问题以及如何解决它吗?
代码位于playground,并根据要求复制如下。
package main
import (
"fmt"
"sync"
)
type Job struct {
ID int
}
func worker(id int, jobs <-chan Job, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job.ID)
}
fmt.Printf("Worker %d done\n", id)
}
func main() {
numWorkers := 3
maxJobs := 10
var wg sync.WaitGroup
// Create the pool of worker channels
pool := make(chan chan Job, numWorkers)
for i := 0; i < numWorkers; i++ {
workerChan := make(chan Job) // Create a new channel for each worker
pool <- workerChan // Add the worker channel to the pool
go worker(i, workerChan, &wg)
}
defer close(pool)
// Create jobs and distribute them to workers
for i := 0; i < maxJobs; i++ {
job := Job{ID: i}
wg.Add(1)
workerChan := <-pool
workerChan <- job
}
// Wait for all workers to complete
wg.Wait()
fmt.Println("All jobs are processed")
}
首先:你不需要这里的频道。要将工作分配给多个工作人员,您只需让所有工作人员从单个共享通道读取即可。当您向通道发送一件工作时,如果有可用的工作人员,其中一个将接收它,否则,通道发送操作将阻塞,直到有一个可用的工作人员为止。
如果您想为每个工作人员使用单独的通道,您仍然不需要通道的通道,您只需要其中的一部分即可。每个工人都会从专用渠道收听,您将自己管理工作分配:
numWorkers := 3
maxJobs := 10
var wg sync.WaitGroup
pool := make([]chan Job, numWorkers)
for i := 0; i < numWorkers; i++ {
pool[i] = make(chan Job)
wg.Add(1)
go worker(i, pool[i], &wg)
}
for i := 0; i < maxJobs; i++ {
job := Job{ID: i}
pool[i%len(pool)] <- job
}
for _,c:=range pool {
close(c)
}
wg.Wait()
您的代码有两个问题:
workerChannel := <-pool
就会阻塞,因为没有其他频道了。如果你坚持使用频道的频道,你必须把它放回去: workerChan := <-pool
workerChan <- job
pool<-workerChan
这样,你就可以将通道的通道用作循环队列
pool
不会有任何效果。您必须关闭pool
中的通道。