如何在不需要锁的情况下将一个协程变成多个协程?

问题描述 投票:0回答:1

我们有一个 HostPinger 函数,它打算作为一个 goroutine(来自

Main
)启动,并负责监视使用客户端列表初始化的主机列表的健康状况。代码如下:

package server

var getHostHealthChan = make(chan chan bool)
var addHostChan = make(chan string)
var removeHostChan = make(chan string)

func GetHostsHealth() []bool {
    hostHealthList := make(chan bool)
    getHostHealthChan <- hostHealthList
    healthList := make([]bool, 0, 10)
    for health := range hostHealthList {
        healthList = append(healthList, health)
    }
    return healthList
}

func AddHost(host string) {
    addHostChan <- host
}

func RemoveHost(host string) {
    removeHostChan <- host
}

func HostPinger(hosts []string, clients []Client) {
    for {
        select {
        case req := <-getHostHealthChan:
            for i, host := range hosts {
                req <- clients[i].GetHostHealth(host)
            }
        case host := <-addHostChan:
            hosts = append(hosts, host)
            // some logic
        case host := <-removeHostChan:
            // some other logic to remove host
        }
    }
}

我们当前实现的问题是添加/删除/检查任何主机会阻塞整个 goroutine,因此我们最终会得到非常糟糕的并发性。因为

GetHostsHealth
结果是一个相当频繁的调用,我们看到了一些性能问题。有什么好的方法可以将这段代码重构为支持更高并发的形式?

首先想到的是为每个主机启动一个 goroutine 并使用映射来维护主机名和主机 goroutine 通道之间的映射,如下所示:

var hostChanMap = make(map[string]chan Op) // this map needs to be protected by lock whenever host add/remove occurs

func HostPinger(hosts []string, clients []Client) {
    for host := range hosts {
        currChan := make(chan Op)
        go HostPingerForHost(currChannel chan Op) {
            // host pinger code
        }(currChan)
        hostChanMap[host] = currChan 
    }
}

这种方法的问题是添加和删除主机部分仍然需要使用锁来同步。

有没有什么好的解决方案可以让不同主机的健康检查请求互不干扰?最好在我们不需要同步访问某种地图的地方?

multithreading go concurrency goroutine channels
1个回答
0
投票

根据您发布的代码,

GetHostHealth
HostPinger
发送请求,它遍历所有主机并为每个主机调用
client.GetHostHealth(host)
,然后通过通道返回结果。这里没有并发,一切都是依次发生的。这并不比按顺序为每个主机调用
client.GetHostHealth(host)
更好。代码也有些损坏,因为
GetHostHealth
得到一个从未使用过的
host
参数,而是返回一个包含所有布尔值的主机健康切片。
HostPinger
是保留一个主机列表,但是调用者不知道这个列表。

你可以做的是同时调用

client.GetHostHealth(host)
。为此,您需要为每个主机创建一个单独的 goroutine。

func GetHostsHealth(hosts []string, clients []Client) []bool {
     healthList := make([]bool, len(hosts))
     wg:sync.WaitGroup{}
     for i:=range hosts {
         i:=i
         wg.Add(1)
         go func() {
              defer wg.Done()
              healthList[i]=clients[i].GetHostHealth(hosts[i])
         }()
     }
     wg.Wait()
     return healthList
}

不需要单独的 goroutine 通过通道获取请求。

您需要保留主机及其对应客户端的列表。这似乎是您已经在做的事情。只需确保当您调用此函数时,其他一些 goroutine 不会尝试向其中添加/删除项目。

如果维护动态主机列表是问题的核心,您可以修改您必须合并我的建议的内容:

type HostInfo struct {
   host string
   client Client
}


func HostPinger() {
   hosts:=make(map[string]HostInfo) 
   for {
      select {
         case hostInfo:=<-addHost: // chan HostInfo
            hosts[hostInfo.host]=hostInfo
         case host:=<-removeHost: // chan string
            delete(hosts,host)
         case ret:=<-getHostHealth: // chan chan []bool
            hostList:=make([]string,0,len(hosts))
            clientList:=make([]Client,0,len(hosts))
            for k,v:=range hosts {
               hostList=append(hostList,k)
               clientList=append(clientList,v.cient)
            }
            go func() {
                ret <-GetHostsHealth(hostList,clientList)
             }()
      }     
   }
}

这个

HostPinger
在单独的 goroutine 中运行,异步处理主机添加/删除请求以及 ping 请求。

© www.soinside.com 2019 - 2024. All rights reserved.