TL; DR:请转到最后一部分,告诉我如何解决此问题。
今天早上我已经从Python开始使用Go。我想用bit并发性和不同的命令行参数多次从Go调用封闭源可执行文件。我得到的代码工作得很好,但是我想得到您的意见,以便进行改进。由于我处于早期学习阶段,因此我还将解释我的工作流程。
为了简单起见,在此假设此“外部封闭源程序”是zenity
,这是一种Linux命令行工具,可以从命令行显示图形消息框。
所以,在Go中,我会这样:
package main
import "os/exec"
func main() {
cmd := exec.Command("zenity", "--info", "--text='Hello World'")
cmd.Run()
}
这应该工作正确。注意,.Run()
在功能上等同于.Start()
,后跟.Wait()
。很好,但是如果我只想执行一次该程序,那么整个编程工作将不值得。因此,我们只需多次执行。
现在,我已经开始工作了,我想使用自定义命令行参数多次调用我的程序(为简单起见,这里只是i
。
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
for i:=0; i<NumEl; i++ {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
}
确定,我们做到了!但是我仍然看不到Go相对于Python的优势。。。这段代码实际上是以串行方式执行的。我有一个多核CPU,我想利用它。因此,让我们与goroutines并发一些。
让我们重写代码以使调用和重用更加容易,并添加著名的go
关键字:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8
for i:=0; i<NumEl; i++ {
go callProg(i) // <--- There!
}
}
func callProg(i int) {
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
没事!问题是什么?所有goroutine都立即执行。我真的不知道为什么不执行zenity,而是执行AFAIK,Go程序在甚至无法初始化zenity外部程序之前就退出了。通过使用time.Sleep
确认了这一点:等待几秒钟足以让zenity的8个实例启动。我不知道这是否可以视为错误。
更糟的是,我实际上想调用的真实程序需要一段时间才能执行。如果我在4核CPU上并行执行该程序的8个实例,这将浪费一些时间进行大量上下文切换…………我不知道普通Go goroutine的行为如何,但是exec.Command
will启动zenity在8个不同线程中进行8次。更糟糕的是,我希望执行此程序超过100,000次。在goroutines中一次执行所有这些操作根本不会有效。不过,我还是想利用4核CPU!
在线资源倾向于建议将sync.WaitGroup
用于此类工作。这种方法的问题在于,您基本上正在使用一批goroutine:如果我创建了由4个成员组成的WaitGroup,则Go程序将等待all 4个外部程序完成,然后再调用新的4个程序。这效率不高:再次浪费CPU。
其他一些资源建议使用缓冲通道来完成工作:
package main
import (
"os/exec"
"strconv"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
for i:=0; i<NumEl; i++ {
go callProg(i, c)
c <- true // At the NumCoreth iteration, c is blocking
}
}
func callProg(i int, c chan bool) {
defer func () {<- c}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
这看起来很丑。渠道并非旨在达到此目的:我正在利用副作用。我喜欢defer
的概念,但是我讨厌必须声明一个函数(甚至是lambda)才能从我创建的虚拟通道中弹出一个值。哦,当然,使用虚拟频道本身很丑。
现在我们快完成了。我只需要考虑另一个副作用:Go程序在关闭所有zenity弹出窗口之前先关闭。这是因为在循环完成时(在第8次迭代中),没有什么会阻止程序完成。这次,sync.WaitGroup
将很有用。
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
NumEl := 8 // Number of times the external program is called
NumCore := 4 // Number of available cores
c := make(chan bool, NumCore - 1)
wg := new(sync.WaitGroup)
wg.Add(NumEl) // Set the number of goroutines to (0 + NumEl)
for i:=0; i<NumEl; i++ {
go callProg(i, c, wg)
c <- true // At the NumCoreth iteration, c is blocking
}
wg.Wait() // Wait for all the children to die
close(c)
}
func callProg(i int, c chan bool, wg *sync.WaitGroup) {
defer func () {
<- c
wg.Done() // Decrease the number of alive goroutines
}()
cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
cmd.Run()
}
完成。
我不是说线程; Go如何在内部管理goroutine无关紧要。我的意思是限制一次启动的goroutine的数量:exec.Command
每次调用时都会创建一个新线程,所以我应该控制它的调用时间。
我无法使自己相信这样的虚拟渠道是可行的方式。
我会生成4个工人goroutine,它们从一个公共通道读取任务。比其他程序更快的Goroutine(因为它们的排定时间不同或碰巧得到简单的任务)将从该通道接收的任务比其他程序更多。除此之外,我将使用sync.WaitGroup等待所有工作人员完成。剩下的只是任务的创建。您可以在此处查看该方法的示例实现:
package main
import (
"os/exec"
"strconv"
"sync"
)
func main() {
tasks := make(chan *exec.Cmd, 64)
// spawn four worker goroutines
var wg sync.WaitGroup
for i := 0; i < 4; i++ {
wg.Add(1)
go func() {
for cmd := range tasks {
cmd.Run()
}
wg.Done()
}()
}
// generate some tasks
for i := 0; i < 10; i++ {
tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
}
close(tasks)
// wait for the workers to finish
wg.Wait()
}
可能还有其他可能的方法,但是我认为这是一个非常简单易懂的解决方案。
一种简单的节流方法(执行f()
次N次,但同时执行最大maxConcurrency
次,只是一个方案:
package main
import (
"sync"
)
const maxConcurrency = 4 // for example
var throttle = make(chan int, maxConcurrency)
func main() {
const N = 100 // for example
var wg sync.WaitGroup
for i := 0; i < N; i++ {
throttle <- 1 // whatever number
wg.Add(1)
go f(i, &wg, throttle)
}
wg.Wait()
}
func f(i int, wg *sync.WaitGroup, throttle chan int) {
defer wg.Done()
// whatever processing
println(i)
<-throttle
}
我可能不会将throttle
频道称为“虚拟”。恕我直言,这是一种优雅的方式(这当然不是我的发明),如何限制并发性。
BTW:请注意,您忽略了cmd.Run()
返回的错误。
尝试一下:https://github.com/korovkin/limiter
limiter := NewConcurrencyLimiter(10)
limiter.Execute(func() {
zenity(...)
})
limiter.Wait()