更简单的 Golang 并发模式

问题描述 投票:0回答:1

我一直在尝试创建一个函数,使多个外部 api 调用同时进行,并牢记以下内容

  • 返回遇到的第一个错误
  • 聚合 api 调用的结果

这是我的没有并发的函数

func GetMicrosoftTeamsChannelSuggestions(workspace *model.Workspace([]*model.MicrosoftTeamsChannel, error) {
    allChannels := []*model.MicrosoftTeamsChannel{}
    teamsGroups := GetMicrosoftTeamsGroupsFromWorkspace(workspace)

    for _, teamGroup := range teamsGroups {
        channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
        if err != nil {
            return nil, err
        }
        allChannels = append(allChannels, channels...)
    }

    return allChannels, nil
}

我的解决方案感觉有点矫枉过正。一方面,我有点希望我们能够消除“疯狂”的 for 循环和带有 select 语句的通道多路复用。我相信有一个更简单的解决方案。

我阅读了有关并发模式的内容,并尝试使用一种并发模式来实现这一点,并生成了下面的代码

func GetMicrosoftTeamsChannelSuggestions(workspace *model.Workspace) ([]*model.MicrosoftTeamsChannel, error) {
    allChannels := []*model.MicrosoftTeamsChannel{}
    teamsGroups := GetMicrosoftTeamsGroupsFromWorkspace(workspace)

    ch := make(chan []*model.MicrosoftTeamsChannel, len(teamsGroups))
    errCh := make(chan error)

    defer func() {
        close(ch)
        close(errCh)
    }()

    var wg sync.WaitGroup
    wg.Add(len(teamsGroups))

    for _, teamGroup := range teamsGroups {
        go func(teamGroup string) {
            defer wg.Done()

            channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
            if err != nil {
                errCh <- err
            } else {
                ch <- channels
            }
        }(teamGroup)
    }

    wg.Wait()

    for {
        select {
        case channels := <-ch:
            allChannels = append(allChannels, channels...)
        case err := <-errCh:
            return nil, err // Return the first encountered error
        default:
            return allChannels, nil
        }
    }
}
go concurrency goroutine
1个回答
0
投票

您的解决方案将在 for 循环中创建许多 goroutine,然后在所有 goroutine 终止后拉取结果。此外,等待结果的 for 循环在

default
中存在
select
情况,导致其忙循环。而且,即使其中一个调用失败,它们也会全部执行。

您可以改进此解决方案。首先,使用上下文来取消,并使用单个通道来传输结果和错误:


type Result struct {
    Result []*model.MicrosoftTeamsChannel
    Err error
}

ctx, cancel:=context.WithCancel(context.Background())
ch:=make(chan Result) // No need for buffered channel

设置 reader goroutine:

var err error
go func() {
   for result:=range ch {
      if ch.Err!=nil {
         cancel()
         if err!=nil { // Record the first error
             err=ch.Err
         }
      } else {
         allChannels = append(allChannels, result.Result...)
      }    
   }
}()

然后启动 goroutine:

var wg sync.WaitGroup
wg.Add(len(teamsGroups))
for _, teamGroup := range teamsGroups {
    go func(teamGroup string) {
        defer wg.Done()
        // Stop if canceled
        if ctx.Err()!=nil {
           return
        }
        channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
         ch<-Result{Result: channels, Err:err}
        }(teamGroup)
 }

等待他们结束:

wg.Wait()

关闭通道以便读者可以终止:

close(ch)
© www.soinside.com 2019 - 2024. All rights reserved.