如何以“池化”方式消耗无缓冲的go通道

问题描述 投票:0回答:1

为了更好地说明我的问题,这里是带有注释的代码示例:

(观察:我使用的是 golang v1.20.5)

// start to consume a queue
// deliveries is an unbuffered receiver go channel of the type <-chan Message
deliveries, err := qb.pubSubSubscriber.Consume(ctx, qName)
if err != nil {
    return err
}

// infinite loop to consume the messages
for msg := range deliveries {
    // and for every msg I execute a function
    result := myFunc()
}

这里的想法是像一个由 n 个工作人员组成的池一样使用消息,如果其中一个工作人员有空,它们就会收到一条消息。

更清楚地说,下面的示例不是有效的解决方案:

// for the workerPool is this situation i would use the
// tunny worker pool go package
workerPool := newWorkerPoolWithNWorkers()
for msg := range deliveries {
    go func(){
         result:=workerPool(myFunc)
    }()   
}

这是无效的,因为在我看来,这段代码的作用是一次获取每条消息,并让workerPool一次与n个工人一起完成其工作,但问题是,如何为每个“获取一条新消息”无限循环中的“免费”工人?

假设我们有一个包含 100 条消息的队列,想要的解决方案是首先获取 3 条消息,但是当处理其中一条获取的消息时,代码会收到另一条新消息并无限循环。

我正在尝试做类似的事情

wg := new(sync.WaitGroup)
counter := 0 
for msg := range deliveries {
    wg.Wait()
    go func(){
         counter ++
         if counter == n { // n could be any integer number wanted to limit the pool size
             //this way a new message would be at wg.Wait() if all n goroutines are busy
             wg.Add(1)
         }
         result:= myFunc()
         count--
         wg.Done()// one of the N "workers" is free, so we can ask for one more message
    }()   
}

但是看起来太复杂了,我觉得行不通。

如果有人可以帮助我,我将非常感激!

go queue channel pool
1个回答
0
投票

我觉得你有点想多了。要使用工作池使用来自通道的消息,您可以:

for i:=0;i<nWorkers;i++ {
   go func() {
      for msg:=range deliveries {
         workWithMsg(msg)
      }
   }()
}

换句话说,您创建了

n
协程,所有协程都从同一通道侦听。运行时负责调度哪个 goroutine 接收它。当
deliveries
通道关闭时,所有工作线程都会终止。

© www.soinside.com 2019 - 2024. All rights reserved.