如何正确实现并发goroutine(和/或限制它们)以产生一致的结果?

问题描述 投票:1回答:1

我正在使用这个:(符号是[]字符串以及filteredSymbols)

concurrency := 5
sem := make(chan bool, concurrency)

for i := range symbols {
    sem <- true
    go func(int) {
        defer func() { <-sem }()
        rows, err := stmt.Query(symbols[i])
        if <some condition is true> {
            filteredSymbols = append(filteredSymbols, symbols[i])
        }
    }(i)
}
for i := 0; i < cap(sem); i++ {
    sem <- true
}

限制同时运行的goroutine数量。我需要限制它,因为每个goroutine都与Postgres数据库交互,有时我确实有超过3000个符号来评估。该代码用于分析大型金融数据,股票和其他证券。我也使用相同的代码从db获取OHLC和预先计算的数据。这是一种现代方法吗?我问这个是因为WaitGroups已经存在,而我正在寻找一种方法来代替它们。

另外,我发现上面的方法有时会产生不同的结果。我有一个代码,有时得到的filteredSymbols数量是1409.如果不改变参数,它将产生1407个结果,然后有1408个结果。我甚至有一个代码,结果有很大的缺陷。

下面的代码非常不一致,所以我删除了并发。 (注意,在下面的代码中,我甚至不必限制并发goroutine,因为它们只使用内存资源)。删除并发修复了它

func getCommonSymbols(symbols1 []string, symbols2 []string) (symbols []string) {
    defer timeTrack(time.Now(), "Get common symbols")
    // concurrency := len(symbols1)
    // sem := make(chan bool, concurrency)

    // for _, s := range symbols1 {
    for _, sym := range symbols1 {
        // sym := s
        // sem <- true
        // go func(string) {
        // defer func() { <-sem }()
        for k := range symbols2 {
            if sym == symbols2[k] {
                symbols = append(symbols, sym)
                break
            }
        }
        // }(sym)
    }
    // for i := 0; i < cap(sem); i++ {
    //  sem <- true
    // }
    return
}
go concurrency
1个回答
0
投票

您有一个数据竞争,多个goroutines同时更新filteredSymbols。您可以进行的最小修改是在附加调用周围添加互斥锁,例如:

concurrency := 5
sem := make(chan bool, concurrency)
l := sync.Mutex{}
for i := range symbols {
    sem <- true
    go func(int) {
        defer func() { <-sem }()
        rows, err := stmt.Query(symbols[i])
        if <some condition is true> {
            l.Lock()
            filteredSymbols = append(filteredSymbols, symbols[i])
            l.Unlock()
        }
    }(i)
}
for i := 0; i < cap(sem); i++ {
    sem <- true
}

Race Detector也可以帮助你发现这一点。一种常见的替代方法是使用通道将工作转换为goroutine,并使用通道来获得结果,例如。

concurrency := 5
workCh := make(chan string, concurrency)
resCh := make(chan string, concurrency)
workersWg := sync.WaitGroup{}
// start the required number of workers, use the WaitGroup to see when they're done
for i := 0; i < concurrency; i++ {
   workersWg.Add(1)
   go func() {
     defer workersWg.Done()
     for symbol := range workCh {
          // do some work
          if cond {
              resCh <- symbol
          }
     }
   }()
}
go func() {
    // when all the workers are done, close the resultsCh
    workersWg.Wait()
    close(resCh)
}()
// submit all the work
for _, s := range symbols {
    workCh <- s
}
close(workCh)
// collect up the results 
for r := range resCh {
    filteredSymbols = append(filteredSymbols, r)
}
© www.soinside.com 2019 - 2024. All rights reserved.