我具有以下功能来使用频道中的任务
func consumeTasks() {
pool := NewPool(8)
pool.Run()
go ackCompletedTasks(pool)
for message := range mq.Consume("TASK") { // this channel becomes dead after few minutes
task := &model.Task{}
err := json.Unmarshal(message.Body, task)
if err != nil {
util.HandlerError(err, "Unable to decode JSON")
}
pool.Tasks <- task
messages[task.Id()] = message
}
}
[基本上,我正在从Rabbitmq读取一些值,将它们转换为任务,对其进行处理并确认消息。问题是一段时间后,如果通道空闲,它将变为无效(即,即使新消息稍后出现,它们也不会被处理)。因此,我想从当时存在的通道中消耗尽可能多的消息,然后退出该函数,以便在一段时间后可以重新触发该方法。但是for循环永远不会结束,因为它在通道上被阻塞,并且永远不会释放。我该如何解决?
在循环内使用select
语句进行非阻塞接收。 select
情况可用于“检测”没有准备好要接收的消息,因此您可以返回。
default
从ch := range mq.Consume("TASK")
for {
select {
case msg, ok := <-ch:
if !ok {
// Channel closed
return
}
task := &model.Task{}
err := json.Unmarshal(message.Body, task)
if err != nil {
util.HandlerError(err, "Unable to decode JSON")
}
pool.Tasks <- task
messages[task.Id()] = message
default:
// No new ready messages
return
}
}
引用
如果可以进行一项或多项通信,则可以通过统一的伪随机选择来选择可以进行的单个通信。否则,如果存在默认情况,则选择该情况。