在固定时间后退出被阻止的频道

问题描述 投票:0回答:1

我具有以下功能来使用频道中的任务

func consumeTasks() {
    pool := NewPool(8)
    pool.Run()
    go ackCompletedTasks(pool)
    for message := range mq.Consume("TASK") { // this channel becomes dead after few minutes
        task := &model.Task{}
        err := json.Unmarshal(message.Body, task)
        if err != nil {
            util.HandlerError(err, "Unable to decode JSON")
        }
        pool.Tasks <- task
        messages[task.Id()] = message
    }
}

[基本上,我正在从Rabbitmq读取一些值,将它们转换为任务,对其进行处理并确认消息。问题是一段时间后,如果通道空闲,它将变为无效(即,即使新消息稍后出现,它们也不会被处理)。因此,我想从当时存在的通道中消耗尽可能多的消息,然后退出该函数,以便在一段时间后可以重新触发该方法。但是for循环永远不会结束,因为它在通道上被阻塞,并且永远不会释放。我该如何解决?

go rabbitmq channel
1个回答
0
投票

在循环内使用select语句进行非阻塞接收。 select情况可用于“检测”没有准备好要接收的消息,因此您可以返回。

default

ch := range mq.Consume("TASK") for { select { case msg, ok := <-ch: if !ok { // Channel closed return } task := &model.Task{} err := json.Unmarshal(message.Body, task) if err != nil { util.HandlerError(err, "Unable to decode JSON") } pool.Tasks <- task messages[task.Id()] = message default: // No new ready messages return } } 引用

如果可以进行一项或多项通信,则可以通过统一的伪随机选择来选择可以进行的单个通信。否则,如果存在默认情况,则选择该情况。

© www.soinside.com 2019 - 2024. All rights reserved.