我有一个程序,我基本上有三种情况 - 为键设置值,如果它存在则获取值,或者等到给定键的值可用。我最初的想法 - 创建一个带有
map[string]interface{}
的新类型 - 存储“持久”值的位置。除此之外,为了等待一个值,我计划使用 map[string](chan struct{})
。当调用 Set()
方法时,我会写入该通道,等待它的任何人都会知道该值在那里。
我事先不知道钥匙 - 它们是随机的。我不确定如何正确实施
Wait()
方法。
type Map struct {
sync.Mutex
m map[string]interface{}
wait map[string]chan (struct{})
}
func (m *Map) Set(key string, value interface{}) {
m.ensureWaitChan(key)
m.Lock()
defer m.Unlock()
m.m[key] = value
// Signal to all waiting.
m.wait[key] <- struct{}{}
}
func (m *Map) Wait(key string) interface{} {
m.ensureWaitChan(key)
m.Lock()
value, ok := m.m[key]
if ok {
m.Unlock()
return value
}
m.Unlock()
// <------ Unlocked state where something might happen.
<-m.wait[key]
value := m.m[key]
return value
}
// If the channel does not exist for those waiting - create it.
func (m *Map) ensureWaitChan(key string) {
m.Lock()
defer m.Unlock()
_, ok := m.wait[key]
if ok {
return
}
m.wait[key] = make(chan struct{}, 100)
}
问题是 -
Wait()
中存在竞争条件 - 在我释放互斥体之后,在我开始监听传入值的通道之前。
处理这个问题的最佳方法是什么?对任何其他关于如何实现这一点的建议持开放态度,我相信一定有更好的方法来做到这一点。我会避免以固定的时间间隔或类似的方式轮询该值。
您正在寻找的是同步地图和消息代理之间的混合体。我们可以通过利用通信和同步通道来做到这一点,这样订阅者就可以在消息发布后立即收到消息,如果它们还没有在缓存中的话。
type Map struct {
sync.Mutex
m map[string]any
subs map[string][]chan any
}
func (m *Map) Set(key string, value any) {
m.Lock()
defer m.Unlock()
m.m[key] = value
// Send the new value to all waiting subscribers of the key
for _, sub := range m.subs[key] {
sub <- value
}
delete(m.subs, key)
}
func (m *Map) Wait(key string) any {
m.Lock()
// Unlock cannot be deferred so we can unblock Set() while waiting
value, ok := m.m[key]
if ok {
m.Unlock()
return value
}
// if there is no value yet, subscribe to any new values for this key
ch := make(chan any)
m.subs[key] = append(m.subs[key], ch)
m.Unlock()
return <-ch
}
因为订阅者在等待时必须解锁地图互斥体,所以他们无法安全地访问添加到地图的新消息。我们通过自己的频道将新值直接发送给所有订阅者,这样我们就不需要在
Set
内添加更多同步,以确保在解锁地图本身之前所有订阅者都满意。提前解锁地图将允许订阅者直接阅读它,但同时也会允许插入新值,从而导致结果不一致。
运行版本,还包括带有类型参数的通用
Map
实现:https://go.dev/play/p/AN7VRSPdGmO
这个怎么样?它使用条件变量。
package main
import (
"fmt"
"sync"
)
type Map struct {
mu sync.Mutex
cond *sync.Cond
m map[string]interface{}
}
func (d *Map) Set(a string, b interface{}) {
d.mu.Lock()
defer d.mu.Unlock()
d.m[a] = b
d.cond.Broadcast()
}
func (d *Map) Get(a string) interface{} {
d.mu.Lock()
defer d.mu.Unlock()
return d.m[a]
}
func (d *Map) Wait(a string) interface{} {
d.mu.Lock()
defer d.mu.Unlock()
for {
if b, ok := d.m[a]; ok {
return b
}
d.cond.Wait()
}
}
func main() {
d := &Map{}
d.m = make(map[string]interface{})
d.cond = sync.NewCond(&d.mu)
var wg sync.WaitGroup
wg.Add(1)
go func() {
fmt.Println("waited and got c=", d.Wait("c"))
wg.Done()
}()
d.Set("a", "apple")
d.Set("b", "banana")
fmt.Println("b=", d.Get("b"))
fmt.Println("a=", d.Get("a"))
fmt.Println("c=", d.Get("c"))
d.Set("c", "cherry")
fmt.Println("c=", d.Get("c"))
wg.Wait()
}
如果你想支持一个
d.Cancel()
方法,强制一个d.Wait()
返回nil
,你可以扩展Map
并修改d.Wait()
如下:
type Map struct {
mu sync.Mutex
cond *sync.Cond
m map[string]interface{}
done bool
}
func (d *Map) Wait(a string) interface{} {
d.mu.Lock()
defer d.mu.Unlock()
for !d.done {
if b, ok := d.m[a]; ok {
return b
}
d.cond.Wait()
}
return nil
}
func (d *Map) Cancel() {
d.mu.Lock()
defer d.mu.Unlock()
d.done = true
d.cond.Broadcast()
}
您可以关闭频道,而不是将
struct{}{}
发送到频道。之后所有来自通道的读取都是非阻塞的。
这也解决了一个问题,你的代码目前有:如果多个 goroutines 正在等待同一个值,只有一个会得到信号。
还要检查通道是否已经关闭,因为关闭通道两次会引起恐慌。您可以通过使用默认大小写的 select 语句从通道中非阻塞读取来做到这一点。