我一直在尝试创建一个函数,使多个外部 api 调用同时进行,并牢记以下内容
这是我的没有并发的函数
func GetMicrosoftTeamsChannelSuggestions(workspace *model.Workspace([]*model.MicrosoftTeamsChannel, error) {
allChannels := []*model.MicrosoftTeamsChannel{}
teamsGroups := GetMicrosoftTeamsGroupsFromWorkspace(workspace)
for _, teamGroup := range teamsGroups {
channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
if err != nil {
return nil, err
}
allChannels = append(allChannels, channels...)
}
return allChannels, nil
}
我的解决方案感觉有点矫枉过正。一方面,我有点希望我们能够消除“疯狂”的 for 循环和带有 select 语句的通道多路复用。我相信有一个更简单的解决方案。
我阅读了有关并发模式的内容,并尝试使用一种并发模式来实现这一点,并生成了下面的代码
func GetMicrosoftTeamsChannelSuggestions(workspace *model.Workspace) ([]*model.MicrosoftTeamsChannel, error) {
allChannels := []*model.MicrosoftTeamsChannel{}
teamsGroups := GetMicrosoftTeamsGroupsFromWorkspace(workspace)
ch := make(chan []*model.MicrosoftTeamsChannel, len(teamsGroups))
errCh := make(chan error)
defer func() {
close(ch)
close(errCh)
}()
var wg sync.WaitGroup
wg.Add(len(teamsGroups))
for _, teamGroup := range teamsGroups {
go func(teamGroup string) {
defer wg.Done()
channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
if err != nil {
errCh <- err
} else {
ch <- channels
}
}(teamGroup)
}
wg.Wait()
for {
select {
case channels := <-ch:
allChannels = append(allChannels, channels...)
case err := <-errCh:
return nil, err // Return the first encountered error
default:
return allChannels, nil
}
}
}
您的解决方案将在 for 循环中创建许多 goroutine,然后在所有 goroutine 终止后拉取结果。此外,等待结果的 for 循环在
default
中存在 select
情况,导致其忙循环。而且,即使其中一个调用失败,它们也会全部执行。
您可以改进此解决方案。首先,使用上下文来取消,并使用单个通道来传输结果和错误:
type Result struct {
Result []*model.MicrosoftTeamsChannel
Err error
}
ctx, cancel:=context.WithCancel(context.Background())
ch:=make(chan Result) // No need for buffered channel
设置 reader goroutine:
var err error
go func() {
for result:=range ch {
if ch.Err!=nil {
cancel()
if err!=nil { // Record the first error
err=ch.Err
}
} else {
allChannels = append(allChannels, result.Result...)
}
}
}()
然后启动 goroutine:
var wg sync.WaitGroup
wg.Add(len(teamsGroups))
for _, teamGroup := range teamsGroups {
go func(teamGroup string) {
defer wg.Done()
// Stop if canceled
if ctx.Err()!=nil {
return
}
channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
ch<-Result{Result: channels, Err:err}
}(teamGroup)
}
等待他们结束:
wg.Wait()
关闭通道以便读者可以终止:
close(ch)