我在goroutine中使用频道时遇到问题。
var test = make(chan string)
func main() {
go initChan()
for i := 0; i < 2; i++ {
go readChan()
}
var input string
fmt.Scanln(&input)
}
func initChan() {
for i := 0; i < 100; i++ {
test <- "Iteration num: " + strconv.Itoa(i)
time.Sleep(time.Second * 5)
}
}
func readChan() {
for {
message := <- test
log.Println(message)
}
}
输出:
2019/12/24 08:21:17 Iteration num: 0
2019/12/24 08:21:22 Iteration num: 1
2019/12/24 08:21:27 Iteration num: 2
2019/12/24 08:21:32 Iteration num: 3
2019/12/24 08:21:37 Iteration num: 4
2019/12/24 08:21:42 Iteration num: 5
................................
我需要读取线程,而无需等待测试变量的更新。现在每个readChan()都在等待initChan()更新测试变量。
是否有可能使readChan()线程一次工作而无需为每个线程等待initChan()?
创建了一个恶魔,它将所有消息从测试通道推送到所有其他侦听例程。
var test = make(chan string)
var mapChan = make(map[int]chan string)
var count = 3
func main() {
go initChan()
go deamon()
for i := 0; i < count; i++ {
mapChan[i] = make(chan string)
go readChan(i)
}
var input string
fmt.Scanln(&input)
}
func deamon() {
for {
message := <-test
for i := 0; i < count; i++ {
mapChan[i] <- message
}
}
}
func initChan() {
for i := 0; i < 100; i++ {
test <- "Iteration num: " + strconv.Itoa(i)
time.Sleep(time.Second * 1)
}
}
func readChan(i int) {
for {
select {
case message := <-mapChan[i]:
log.Println(message)
default:
// Do for not when written on channel
}
}
}
如果我正确理解了您的问题,此解决方案可能会有所帮助。我使用了一个大小为1的缓冲通道,以使作为发件人的goroutine永远不会被阻塞(在非缓冲通道的情况下)。您可以阅读有关频道的更多信息:Behaviour of channles
package main
import (
"log"
"strconv"
"sync"
"time"
)
// Buffered channel with size 1 guarantees delayed delivery of data
// As soon as the goroutine sends to the channel, the reciever goroutine dequeus it
// Then the reciver goroutines does the work, but the sender goroutine isn't blocked
// As the size is again 0 after the reciever recieved it but might haven't processed it yet
var test = make(chan string, 1)
func main() {
var wg sync.WaitGroup
wg.Add(2)
// Waits for other goroutines to complete before the main goroutine returns
defer wg.Wait()
go initChan(&wg)
go readChan(&wg)
}
func initChan(wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 100; i++ {
// Sends continuously
test <- "Iteration num: " + strconv.Itoa(i)
time.Sleep(time.Second * 5)
}
close(test)
}
func readChan(wg *sync.WaitGroup) {
defer wg.Done()
var message string
var ok bool
// Reciever deques the value as soon as it recieves it
// But might take time to proceed
for {
select {
case message, ok = <-test:
// If channel is closed
if ok == false {
return
}
log.Println(message)
default:
log.Println(message)
}
}
}