我是新手,无法找到这个问题的答案。我正在做的是在生产者中读取CSV文件,做一些可能需要时间的事情,然后通过频道将输出发送给消费者。有一系列生产者 - 消费者,任何生产者最终都可能比消费者慢。
生产者(1 goroutine) - > chan0 - > consumer-producer-1(> 1 goroutines) - > chan1 - > consumer-producer-2(> 1 goroutines) - > chan2 - > consumer(> 1 goroutines)
这里最多可以有15位消费者。
现在我面临的问题是,如果生产者完成,如何决定消费者方面,我们可以停止处理。
我需要实现的是:
我使用了以下方法。
processRemaining = false
for processRemaining == false{
select {
case stuff, ok := <-input_messages:
do_stuff(stuff)
if ok == false { // if channel has been closed
processRemaining = true
}
if result != nil {
//send to channel output_messages
}
case sig := <-input_signals: // if signaled to stopped.
fmt.Println("received signal", sig)
processRemaining = true
default:
fmt.Println("no activity")
}
}
if processRemaining {
for stuff := range input_messages {
do_stuff(stuff)
if result != nil {
//send to channel output_messages
}
}
// send "output_routine" number of "done" to a channel "output_signals".
}
但即使在这种方法中,我也无法想出任何方式与闭合的“input_messages”频道相同的行为,如果没有可用的话,比如10秒。
我用这种方法无视任何问题。处理此问题的可能方式(或并发模式)是什么?保证:
使用sync.WaitGroup
来跟踪正在运行的goroutines的数量。每个goroutine在不再从频道获取数据后退出。一旦WaitGroup
完成,就可以完成清理工作。
像这样的东西:
import (
"sync"
"time"
)
type Data interface{} // just an example
type Consumer interface {
Consume(Data) Data
CleanUp()
Count() int
Timeout() time.Duration
}
func StartConsumers(consumer Consumer, inCh <-chan Data, outCh chan<- Data) {
wg := sync.WaitGroup{}
for i := 0; i < consumer.Count(); i++ {
wg.Add(1)
go func() {
consumeLoop:
for {
select {
case v, ok := <-inCh: // 'ok' says if the channel is still open
if !ok {
break consumeLoop
}
outCh <- consumer.Consume(v)
case <-time.After(consumer.Timeout()):
break consumeLoop
}
}
wg.Done()
}()
}
wg.Wait()
consumer.CleanUp()
close(outCh)
}
在管道的每个阶段,您可以使用与上面类似的过程来启动消费者。