如何在不等待其他goroutine设置的情况下读取频道?

问题描述 投票:2回答:2

我在goroutine中使用频道时遇到问题。

var test = make(chan string)

func main() {
    go initChan()

    for i := 0; i < 2; i++ {
        go readChan()
    }

    var input string
    fmt.Scanln(&input)
}

func initChan() {
    for i := 0; i < 100; i++ {
        test <- "Iteration num: " + strconv.Itoa(i)
        time.Sleep(time.Second * 5)
    }
}

func readChan() {
    for {
        message := <- test
        log.Println(message)
    }
}

输出:

2019/12/24 08:21:17 Iteration num: 0
2019/12/24 08:21:22 Iteration num: 1
2019/12/24 08:21:27 Iteration num: 2
2019/12/24 08:21:32 Iteration num: 3
2019/12/24 08:21:37 Iteration num: 4
2019/12/24 08:21:42 Iteration num: 5
................................

我需要读取线程,而无需等待测试变量的更新。现在每个readChan()都在等待initChan()更新测试变量。

是否有可能使readChan()线程一次工作而无需为每个线程等待initChan()?

multithreading go goroutine
2个回答
2
投票

创建了一个恶魔,它将所有消息从测试通道推送到所有其他侦听例程。

var test = make(chan string)

var mapChan = make(map[int]chan string)
var count = 3

func main() {
    go initChan()
    go deamon()
    for i := 0; i < count; i++ {
        mapChan[i] = make(chan string)
        go readChan(i)
    }

    var input string
    fmt.Scanln(&input)
}

func deamon() {
    for {
        message := <-test
        for i := 0; i < count; i++ {
            mapChan[i] <- message
        }
    }
}

func initChan() {
    for i := 0; i < 100; i++ {
        test <- "Iteration num: " + strconv.Itoa(i)
        time.Sleep(time.Second * 1)
    }
}

func readChan(i int) {
    for {
        select {

        case message := <-mapChan[i]:
            log.Println(message)
        default:
            // Do for not when written on channel
        }
    }
}

0
投票

如果我正确理解了您的问题,此解决方案可能会有所帮助。我使用了一个大小为1的缓冲通道,以使作为发件人的goroutine永远不会被阻塞(在非缓冲通道的情况下)。您可以阅读有关频道的更多信息:Behaviour of channles

package main

import (
    "log"
    "strconv"
    "sync"
    "time"
)

// Buffered channel with size 1 guarantees delayed delivery of data
// As soon as the goroutine sends to the channel, the reciever goroutine dequeus it
// Then the reciver goroutines does the work, but the sender goroutine isn't blocked
// As the size is again 0 after the reciever recieved it but might haven't processed it yet
var test = make(chan string, 1)

func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    // Waits for other goroutines to complete before the main goroutine returns
    defer wg.Wait()
    go initChan(&wg)
    go readChan(&wg)
}

func initChan(wg *sync.WaitGroup) {
    defer wg.Done()
    for i := 0; i < 100; i++ {
        // Sends continuously
        test <- "Iteration num: " + strconv.Itoa(i)
        time.Sleep(time.Second * 5)
    }
    close(test)
}

func readChan(wg *sync.WaitGroup) {
    defer wg.Done()
    var message string
    var ok bool
    // Reciever deques the value as soon as it recieves it
    // But might take time to proceed
    for {
        select {
        case message, ok = <-test:
            // If channel is closed
            if ok == false {
                return
            }
            log.Println(message)
        default:
            log.Println(message)
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.