无阻塞地从协程通道读取

问题描述 投票:0回答:4

我有两个 goroutines:主要的

worker
和一个
helper
,它为了一些帮助而分离出来。
helper
可能会遇到错误,所以我使用一个渠道将错误从
helper
传达给
worker
.

func helper(c chan <- error) (){
    //do some work
    c <- err // send errors/nil on c
}

这里是

helper()
的称呼:

func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    err := <- c
    return err
}

问题:

  • 声明

    err := <- c
    是否阻塞
    worker
    ?我不这么认为,因为频道是缓冲的。

  • 如果它是阻塞的,我如何让它成为非阻塞的?我的要求是让

    worker
    和它的调用者继续完成其余的工作,而无需 wait 值出现在通道上。

谢谢。

go channel goroutine
4个回答
3
投票

您可以轻松验证

func helper(c chan<- error) {
    time.Sleep(5 * time.Second)
    c <- errors.New("") // send errors/nil on c
}

func worker() error {
    fmt.Println("do one")

    c := make(chan error, 1)
    go helper(c)

    err := <-c
    fmt.Println("do two")

    return err
}

func main() {
    worker()
}

Q: 语句是否错误 := <- c blocking worker? I don't think so, since the channel is buffered.

A:

err := <- c
会阻止工人。

Q:如果是阻塞的,如何让它变成非阻塞的?我的要求是让 worker 和它的调用者继续完成剩下的工作,而不是等待值出现在通道上。

A:如果你不想阻塞,只需删除

err := <-c
。如果你需要在最后犯错,只需将
err := <-c
移到最后即可。

你不能不阻塞地读取通道,如果你不阻塞地通过,就不能再执行这段代码,除非你的代码在循环中。

Loop:
    for {
        select {
        case <-c:
            break Loop
        default:
            //default will go through without blocking
        }
        // do something
    }

你有没有见过 errgroup 或 waitgroup?

它使用原子,取消上下文和同步。一旦实现这个。

https://github.com/golang/sync/blob/master/errgroup/errgroup.go

https://github.com/golang/go/blob/master/src/sync/waitgroup.go

或者你可以只使用它,go you func 然后在你想要的任何地方等待错误。


1
投票

在您的代码中,其余工作与帮助程序是否遇到错误无关。剩下的工作完成后,你可以简单地从频道接收。

func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    //do rest of the work
    return <-c
}

0
投票

以非阻塞方式从通道读取的功能方式:

func CollectChanOne[T any](ch <-chan T) (T, bool) {
    select {
    case val, stillOpen := <-ch:
        return val, stillOpen
    default:
        var zeroT T
        return zeroT, false
    }
}



func worker() error {
    //do some work
    c := make(chan error, 1)
    go helper(c)
    err, _ := CollectChanOne(c) // Wont block worker
    return err
}

示例:https://go.dev/play/p/Njwyt32B4oT

注意这个例子还有另一个方法 CollectChanRemaining() 读取通道中所有缓冲的元素。


-1
投票

我想你需要这段代码..

运行这段代码

package main

import (
    "log"
    "sync"
)

func helper(c chan<- error) {

    for {
        var err error = nil
        // do job

        if err != nil {
            c <- err // send errors/nil on c
            break
        }
    }

}

func worker(c chan error) error {
    log.Println("first log")

    go func() {
        helper(c)
    }()

    count := 1
    Loop:
        for {
            select {
            case err := <- c :
                return err
            default:
                log.Println(count, " log")
                count++
                isFinished := false
                // do your job
                if isFinished {
                    break Loop // remove this when you test

                }
            }
        }
    return nil
}

func main() {
    wg := sync.WaitGroup{}
    wg.Add(1)
    go func() {
        c := make(chan error, 1)
        worker(c)
        wg.Done()
    }()
    wg.Wait()
}
© www.soinside.com 2019 - 2024. All rights reserved.