为了更好地说明我的问题,这里是带有注释的代码示例:
(观察:我使用的是 golang v1.20.5)
// start to consume a queue
// deliveries is an unbuffered receiver go channel of the type <-chan Message
deliveries, err := qb.pubSubSubscriber.Consume(ctx, qName)
if err != nil {
return err
}
// infinite loop to consume the messages
for msg := range deliveries {
// and for every msg I execute a function
result := myFunc()
}
这里的想法是像一个由 n 个工作人员组成的池一样使用消息,如果其中一个工作人员有空,它们就会收到一条消息。
更清楚地说,下面的示例不是有效的解决方案:
// for the workerPool is this situation i would use the
// tunny worker pool go package
workerPool := newWorkerPoolWithNWorkers()
for msg := range deliveries {
go func(){
result:=workerPool(myFunc)
}()
}
这是无效的,因为在我看来,这段代码的作用是一次获取每条消息,并让workerPool一次与n个工人一起完成其工作,但问题是,如何为每个“获取一条新消息”无限循环中的“免费”工人?
假设我们有一个包含 100 条消息的队列,想要的解决方案是首先获取 3 条消息,但是当处理其中一条获取的消息时,代码会收到另一条新消息并无限循环。
我正在尝试做类似的事情
wg := new(sync.WaitGroup)
counter := 0
for msg := range deliveries {
wg.Wait()
go func(){
counter ++
if counter == n { // n could be any integer number wanted to limit the pool size
//this way a new message would be at wg.Wait() if all n goroutines are busy
wg.Add(1)
}
result:= myFunc()
count--
wg.Done()// one of the N "workers" is free, so we can ask for one more message
}()
}
但是看起来太复杂了,我觉得行不通。
如果有人可以帮助我,我将非常感激!
我觉得你有点想多了。要使用工作池使用来自通道的消息,您可以:
for i:=0;i<nWorkers;i++ {
go func() {
for msg:=range deliveries {
workWithMsg(msg)
}
}()
}
换句话说,您创建了
n
协程,所有协程都从同一通道侦听。运行时负责调度哪个 goroutine 接收它。当 deliveries
通道关闭时,所有工作线程都会终止。