我正在尝试编写一个函数来执行多个作业,当所有作业都完成后,我希望控件放在wg.Wait()之后。我提到了不同的问题,我在代码的注释中遇到了。
如何使它起作用?
func (q *ChanExecutor) Perform(ctx context.Context, name string, taskData *interface{}) chan *job.JobResult {
var waitgroup sync.WaitGroup
waitgroup.Add(1)
go func(wg *sync.WaitGroup) {
for j := range q.jobCh { // This is the channel which gives jobs
wg.Add(1)
go func(qq *ChanExecutor, jVal job.Job) { // we are just passing these values to closure. Is this necessary?
jobResultChannel:= jVal.Do(ctx) // Here we are executing the job as result which sends another channel of results
donech := jVal.DoneCh() // Job returns another channel which tells if that job is done
for true {
select {
case res := <-jobResultChannel:
q.result <- res // From the result we are passing that result to another channel
case syncJobDone := <-donech:
if syncJobDone {
donech = nil // here if the donech receives true it should come out of the select and for loop and the goroutine. How to do that?
// Another thing here, if the donech returns true before jobResultChannel then it should still go to jobResultChannel's case block
// The jVal.Do(ctx) executes the job and returns channel but in my case before starting the forloop both channels has values and donech has true value
wg.Done()
break
}
}
}
}(q, *j)
}
}(&waitgroup)
go func(wg *sync.WaitGroup, qq *ChanExecutor) {
time.Sleep(200 * time.Millisecond) // Here is another blunder. If I don't sleep here, randomly, it goes after wg.Wait()
// even though all the jobs are not done.
wg.Done() // REmoving the one which was added immediately after creating wg instance.
wg.Wait()
fmt.Println("Wait finish")
qq.Done()
}(&waitgroup, q)
fmt.Printf("returning result channel not result")
return q.result
}
首先,您应该从第二个goroutine中删除睡眠和wg.Done
。有时无法入睡会失败,因为有时第一个goroutine在第二个goroutine将其删除之前没有机会添加到wg
。
第二,您正在尝试终止goroutine,所以只需这样做:
if syncJobDone {
wg.Done()
return
}