我只是写了一个简单的Go流水线,目标是获取urls和打印状态,在fetchUrl时,我需要关闭通道来通知main,不会有数据传来,所以释放main go例程。
在fetchUrl的时候,我需要关闭通道来通知main,不会有数据传来,所以释放main go例程。但是,我不能真的在循环后关闭fetchurl函数的通道,因为它会太快。我不想在应用中加入等待组,因为目前我的目标是了解通道。
在fetchurl函数中,称为two的通道只是为了确保一次只有两个作业。
package main
import (
"fmt"
"net/http"
"os"
)
func gen(val []string) <-chan string {
out := make(chan string, len(val))
for _, val := range val {
out <- val
}
close(out)
return out
}
func fetchUrl(in <-chan string) <-chan string {
out := make(chan string)
two := make(chan struct{}, 2)
fmt.Println("blocked")
for url := range in {
two <- struct{}{}
go fetchWorker(url, two, out)
}
return out
}
func fetchWorker(url string, two chan struct{}, out chan string) {
res, err := http.Get("https://" + url)
if err != nil {
panic(err)
}
<-two
out <- fmt.Sprintf("[%d] %s\n", res.StatusCode, url)
}
func main() {
for val := range fetchUrl(gen(os.Args[1:])) {
fmt.Println(val)
}
}
你需要关闭 out
通道中的每一个结果都被写入后。 最简单的方法是当所有的worker goroutine都退出了,最简单的方法是依次用 sync.WaitGroup
. (在Go中,通道和goroutine是非常密切相关的概念,所以goroutine管理是使用通道的一部分。)
在现有的代码中,你可以把它与你的 fetchUrl
函数。
var wg sync.WaitGroup
for url := range in {
two <- struct{}{}
wg.Add(1)
go func() {
defer wg.Done()
fetchWorker(url, two, out)
}()
}
wg.Wait()
close(out)
另一个结构上的问题是,你的代码写出来的时候会有两个 gen
和 fetchUrl
创建通道,运行所有应该写入通道的代码,并在这些写入者完成后才返回通道;因为在函数返回之前,没有任何东西可以从通道中读取,这将导致死锁。 你可以通过在顶层创建所有的通道并将它们传递到生成器函数中来解决这个问题。
如果你希望正好有两个工人从同一个URL队列中读取,一个标准的模式是只启动两个goroutine从同一个通道中读写。 例如,你可以重写 fetchWorker
作为
func fetchWorker(urls <-chan string, out chan<- string) {
for url := range urls {
res, err := http.Get("https://" + url)
if err != nil {
panic(err)
}
out <- fmt.Sprintf("[%d] %s\n", res.StatusCode, url)
}
}
在顶层,创建渠道,创建工人,养活输入,消耗输出。
func main() {
urls := make(chan string)
out := make(chan string)
// Launch a goroutine to feed data into urls, then
// close(urls), then stop
go gen(os.Args[1:], urls)
// Launch worker goroutines
workerCount := 2
var wg sync.WaitGroup
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
fetchWorker(urls, out)
}()
}
// Launch a dedicated goroutine to close the channel
go func() {
wg.Wait()
close(out)
}()
// Read the results
for result := range(out) {
fmt.Println(result)
}
}