golang中生产者消费者死锁

问题描述 投票:0回答:1

这些是我的全局变量。我在我的主函数中初始化了taskCond。

var (
    doneFlag  bool
    taskMutex sync.Mutex
    taskCond  *sync.Cond
)

我的生产者代码看起来像这样 -

func producer(decoder *json.Decoder, taskQueue *LockFreeQueue) {

    for {
        // process tasks and create task variable
        taskMutex.Lock()
        taskQueue.Enqueue(task)
        taskCond.Signal()
        taskMutex.Unlock()

    }
}

我的消费者代码如下 -

func consumer(id int, taskQueue *LockFreeQueue, f feed.Feed, encoder *json.Encoder, wg *sync.WaitGroup) {
    for {
        taskMutex.Lock()
        for taskQueue.isEmpty() == true && !doneFlag {
            // Wait for a new task or termination signal
            taskCond.Wait()
        }
        if doneFlag {
            taskMutex.Unlock()
            wg.Done()
            return
        }
        // Dequeue a task
        task := taskQueue.Dequeue()

        if task.Command == "DONE" {
            doneFlag = true
            taskCond.Broadcast()
            taskMutex.Unlock()
            wg.Done()
            return
        }
        taskMutex.Unlock()
        // Process the task
    }

}

此代码死锁。我收到错误消息

fatal error: all goroutines are asleep - deadlock!

我不明白为什么会这样。消费者线程在生产者线程之前启动(我不启动新线程,它是主线程),所以最初它们都处于休眠状态。当 p 线程将任务放入队列时,它会向其中一个 c 线程发出信号,表明有可用的任务,然后 c 线程跳出 for 循环,并检查它是否被唤醒,因为一个 c 线程获得了特殊的“完成” “作业(此作业指示线程现在可以返回),情况并非如此,因此它会继续执行,使任务出队,检查它是否收到特殊作业,如果收到,则使完成标志为真,广播并返回。如果没有,它将照常处理作业。整个过程发生在 for 循环中,因为我们不知道有多少作业可用,因此线程在作业之间循环。

了解程序在哪里陷入僵局以及是否有人纠正我在上一段中的解释将非常有帮助。

multithreading go consumer producer
1个回答
0
投票

首先,请尽量不要使用全局变量:)

其次,您使用的 cond 有点不寻常。使用 Mutex 作为参数,如

sync.NewCond(mx)
并调用
cond.L.Lock()/Unlock()
,更适合阅读。

接下来,如果没有完整的示例(正如 Ulrich Eckhardt 已经告诉过的),可能不清楚哪些函数调用被阻塞。

但是,我尝试提取代码并制作了可以无阻塞工作的示例。我不使用互斥锁或条件更改部分(只需在

sync.NewCond
中使用互斥锁并调用
taskCond.L.Lock()/Unlock()
)并删除我们没有的所有代码。

看起来阻塞部分可以在那些

LockFreeQueue
feed.Feed
中。

游乐场

package blocking_test

import (
    "context"
    "sync"
    "testing"
    "time"
)

var (
    doneFlag  bool
    taskMutex sync.Mutex
    taskCond  = sync.NewCond(&taskMutex)
)

func producer(cancelFn func()) {
    time.Sleep(10 * time.Second)

    for {
        taskCond.L.Lock()
        cancelFn()
        taskCond.Signal()
        taskCond.L.Unlock()
        time.Sleep(100 * time.Millisecond)
    }
}

func consumer(ctx context.Context, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        taskCond.L.Lock()
        if doneFlag {
            taskCond.L.Unlock()
            return
        } else {
            taskCond.Wait()
        }

        select {
        case <-ctx.Done():
            doneFlag = true
            taskCond.Broadcast()
            taskCond.L.Unlock()
            return
        default:
        }

        taskCond.L.Unlock()
    }
}

func TestRunCommand(t *testing.T) {
    var wg sync.WaitGroup
    ctx, cancel := context.WithCancel(context.Background())
    go producer(cancel)

    wg.Add(1)
    go consumer(ctx, &wg)
    wg.Wait()
    t.Log("Done")
}
© www.soinside.com 2019 - 2024. All rights reserved.