等到地图中的值在 Go 中可用

问题描述 投票:0回答:3

我有一个程序,我基本上有三种情况 - 为键设置值,如果它存在则获取值,或者等到给定键的值可用。我最初的想法 - 创建一个带有

map[string]interface{}
的新类型 - 存储“持久”值的位置。除此之外,为了等待一个值,我计划使用
map[string](chan struct{})
。当调用
Set()
方法时,我会写入该通道,等待它的任何人都会知道该值在那里。

我事先不知道钥匙 - 它们是随机的。我不确定如何正确实施

Wait()
方法。

type Map struct {
    sync.Mutex

    m    map[string]interface{}
    wait map[string]chan (struct{})
}


func (m *Map) Set(key string, value interface{}) {
    m.ensureWaitChan(key)

    m.Lock()
    defer m.Unlock()

    m.m[key] = value

    // Signal to all waiting.
    m.wait[key] <- struct{}{}
}


func (m *Map) Wait(key string) interface{} {
    m.ensureWaitChan(key)

    m.Lock()
    
    value, ok := m.m[key]
    if ok {
        m.Unlock()
        return value
    }

    m.Unlock()
    // <------ Unlocked state where something might happen.
    <-m.wait[key]

    value := m.m[key]

    return value    
}

// If the channel does not exist for those waiting - create it.
func (m *Map) ensureWaitChan(key string) {
    m.Lock()
    defer m.Unlock()

    _, ok := m.wait[key]
    if ok {
        return
    }

    m.wait[key] = make(chan struct{}, 100)
}

问题是 -

Wait()
中存在竞争条件 - 在我释放互斥体之后,在我开始监听传入值的通道之前。

处理这个问题的最佳方法是什么?对任何其他关于如何实现这一点的建议持开放态度,我相信一定有更好的方法来做到这一点。我会避免以固定的时间间隔或类似的方式轮询该值。

go mutex wait
3个回答
1
投票

您正在寻找的是同步地图和消息代理之间的混合体。我们可以通过利用通信和同步通道来做到这一点,这样订阅者就可以在消息发布后立即收到消息,如果它们还没有在缓存中的话。

type Map struct {
    sync.Mutex

    m    map[string]any
    subs map[string][]chan any
}

func (m *Map) Set(key string, value any) {
    m.Lock()
    defer m.Unlock()

    m.m[key] = value

    // Send the new value to all waiting subscribers of the key
    for _, sub := range m.subs[key] {
        sub <- value
    }
    delete(m.subs, key)
}

func (m *Map) Wait(key string) any {
    m.Lock()
    // Unlock cannot be deferred so we can unblock Set() while waiting

    value, ok := m.m[key]
    if ok {
        m.Unlock()
        return value
    }

    // if there is no value yet, subscribe to any new values for this key
    ch := make(chan any)
    m.subs[key] = append(m.subs[key], ch)
    m.Unlock()

    return <-ch
}

因为订阅者在等待时必须解锁地图互斥体,所以他们无法安全地访问添加到地图的新消息。我们通过自己的频道将新值直接发送给所有订阅者,这样我们就不需要在

Set
内添加更多同步,以确保在解锁地图本身之前所有订阅者都满意。提前解锁地图将允许订阅者直接阅读它,但同时也会允许插入新值,从而导致结果不一致。

运行版本,还包括带有类型参数的通用

Map
实现:https://go.dev/play/p/AN7VRSPdGmO


0
投票

这个怎么样?它使用条件变量。

package main

import (
    "fmt"
    "sync"
)

type Map struct {
    mu   sync.Mutex
    cond *sync.Cond
    m    map[string]interface{}
}

func (d *Map) Set(a string, b interface{}) {
    d.mu.Lock()
    defer d.mu.Unlock()
    d.m[a] = b
    d.cond.Broadcast()
}

func (d *Map) Get(a string) interface{} {
    d.mu.Lock()
    defer d.mu.Unlock()
    return d.m[a]
}

func (d *Map) Wait(a string) interface{} {
    d.mu.Lock()
    defer d.mu.Unlock()
    for {
        if b, ok := d.m[a]; ok {
            return b
        }
        d.cond.Wait()
    }
}

func main() {
    d := &Map{}
    d.m = make(map[string]interface{})
    d.cond = sync.NewCond(&d.mu)

    var wg sync.WaitGroup
    wg.Add(1)
    go func() {
        fmt.Println("waited and got c=", d.Wait("c"))
        wg.Done()
    }()

    d.Set("a", "apple")
    d.Set("b", "banana")
    fmt.Println("b=", d.Get("b"))
    fmt.Println("a=", d.Get("a"))
    fmt.Println("c=", d.Get("c"))
    d.Set("c", "cherry")
    fmt.Println("c=", d.Get("c"))

    wg.Wait()
}

如果你想支持一个

d.Cancel()
方法,强制一个
d.Wait()
返回
nil
,你可以扩展
Map
并修改
d.Wait()
如下:

type Map struct {
    mu   sync.Mutex
    cond *sync.Cond
    m    map[string]interface{}
    done bool
}

func (d *Map) Wait(a string) interface{} {
    d.mu.Lock()
    defer d.mu.Unlock()
    for !d.done {
        if b, ok := d.m[a]; ok {
            return b
        }
        d.cond.Wait()
    }
    return nil
}

func (d *Map) Cancel() {
    d.mu.Lock()
    defer d.mu.Unlock()
    d.done = true
    d.cond.Broadcast()
}

-1
投票

您可以关闭频道,而不是将

struct{}{}
发送到频道。之后所有来自通道的读取都是非阻塞的。 这也解决了一个问题,你的代码目前有:如果多个 goroutines 正在等待同一个值,只有一个会得到信号。

还要检查通道是否已经关闭,因为关闭通道两次会引起恐慌。您可以通过使用默认大小写的 select 语句从通道中非阻塞读取来做到这一点。

© www.soinside.com 2019 - 2024. All rights reserved.