Golang中如何使用CAS实现无锁切片追加?

问题描述 投票:0回答:1

我尝试使用 sync.Map + CompareAndSwap 来实现无锁并发切片追加,但失败了。

我注意到记录“n”重复出现,但我不明白为什么。

去游乐场

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < syncCount; i++ {
        n := i
        syncSum += n
        action := func() {
            asyncCount += 1
            asyncSum += n
            record[n] += 1
        }

        // actions.Add(key, action)

        wg.Add(1)
        go func() {
            actions.Add(key, action)
            wg.Done()
        }()
    }
    wg.Wait()
    actions.Exec(key)

    fmt.Println("count is concurrency safe?", syncCount == asyncCount)
    fmt.Println("  sum is concurrency safe?", syncSum == asyncSum)
}

type Actions struct {
    store sync.Map // key:value = key:*[]func()
}

func (group *Actions) Add(key string, action func()) {
    for {
        value, loaded := group.store.Load(key)
        if !loaded {
            actions := []func(){action}
            value, loaded = group.store.LoadOrStore(key, &actions)
            if !loaded {
                return
            }
        }

        oldActions := value.(*[]func())
        freshActions := append(*oldActions, action)
        if group.store.CompareAndSwap(key, oldActions, &freshActions) {
            return
        }
    }
}

我期望的结果是

count is concurrency safe? true
  sum is concurrency safe? true

但实际结果是

count is concurrency safe? true
  sum is concurrency safe? false
go concurrency lock-free
1个回答
0
投票

在 Go 中进行无锁更新的传统方法是让单个同步路径拥有写入,而异步路径通过通道向其发送消息。

使用

sync.Map
不是无锁的,因为它在内部使用互斥体。

package main

import (
    "fmt"
    "sync"
)

func main() {
    syncCount := 100 // Placeholder as not included in example

    // This goroutine will append every 'action' sent over the channel to the
    // slice. Because it's the only goroutine doing so, no lock is needed.
    //
    // Closing 'ch' will exit the loop, and 'done' is used to wait until all
    // results have been appended before continuing.
    ch := make(chan func())
    done := make(chan struct{})
    actions := []func(){}
    go func() {
        for v := range ch {
            actions = append(actions, v)
        }
        done <- struct{}{}
    }()

    var wg sync.WaitGroup
    for i := 0; i < syncCount; i++ {
        wg.Add(1)
        n := i

        // Assuming this needs to be a goroutine because you have other complex
        // things needed to generate an 'action'
        go func() {
            defer wg.Done()
            ch <- func() {
                fmt.Println("I'm the action for:", n)
            }
        }()
    }
    wg.Done()

    // Close 'ch' and wait for goro to exit
    close(ch)
    <-done
}

© www.soinside.com 2019 - 2024. All rights reserved.