并发地图读取导致 Golang 数据争用

问题描述 投票:0回答:1

我有一个服务器来处理事件,这个服务器有一个

mutex lock
和一个
events
表(地图结构)。当服务器收到一个新事件时,它会获取
lock
以防止数据竞争,将该事件存储在事件表中,并启动一个 goroutine 来监视该事件已完成。如果我使用
-race
标志运行程序,它将输出
data race

package main

import (
    "sync"
    "time"
)

type event struct {
    done chan bool
}

type server struct {
    mu     sync.Mutex
    events map[int]*event
}

func main() {
    s := server{}
    s.events = make(map[int]*event)

    for i := 0; i < 10; i++ {
        go func(i int) {
            s.mu.Lock()
            s.events[i] = &event{}
            s.events[i].done = make(chan bool)
            s.mu.Unlock()
            go func() {
                time.Sleep(1 * time.Millisecond)
                <-s.events[i].done
                // server do something.
            }()
        }(i)
    }

    time.Sleep(1 * time.Second)

    for i := 0; i < 10; i++ {
        // event happen.
        s.events[i].done <- true
    }
}

输出

==================
WARNING: DATA RACE
Read at 0x00c00010dd10 by goroutine 14:
  runtime.mapaccess1_fast64()
      c:/go/src/runtime/map_fast64.go:12 +0x0    
  main.main.func1.1()
      C:/SimpleAsyncBFT/race/main.go:29 +0x7c    

Previous write at 0x00c00010dd10 by goroutine 15:
  runtime.mapassign_fast64()
      c:/go/src/runtime/map_fast64.go:92 +0x0    
  main.main.func1()
      C:/SimpleAsyncBFT/race/main.go:24 +0xbe    

Goroutine 14 (running) created at:
  main.main.func1()
      C:/SimpleAsyncBFT/race/main.go:27 +0x1c6

Goroutine 15 (finished) created at:
  main.main()
      C:/SimpleAsyncBFT/race/main.go:22 +0xed

我知道在monitor goroutine中添加

lock
可以解决这个问题,但是会导致死锁!
done
通道仅用于通知服务器事件已完成。如果渠道不适合这种情况,如何实现?

go
1个回答
4
投票

根据注释,您的代码尝试同时读取和写入地图,并且根据 go 1.6 发行说明

如果一个 Goroutine 正在写入映射,则其他 Goroutine 不应同时读取或写入该映射

查看您的代码似乎没有必要这样做。您可以提前创建频道;创建后,您只需从

map
中读取,因此没有问题:

package main

import (
    "sync"
    "time"
)

type event struct {
    done chan bool
}

type server struct {
    mu     sync.Mutex
    events map[int]*event
}

func main() {
    s := server{}
    s.events = make(map[int]*event)

    for i := 0; i < 10; i++ {
        s.events[i] = &event{}
        s.events[i].done = make(chan bool)
    }

    for i := 0; i < 10; i++ {
        go func(i int) {
            time.Sleep(1 * time.Millisecond)
            <-s.events[i].done
            // server do something.
        }(i)
    }

    time.Sleep(1 * time.Second)

    for i := 0; i < 10; i++ {
        // event happen.
        s.events[i].done <- true
    }
}

或者不要在 go 例程中访问地图:

package main

import (
    "sync"
    "time"
)

type event struct {
    done chan bool
}

type server struct {
    mu     sync.Mutex
    events map[int]*event
}

func main() {
    s := server{}
    s.events = make(map[int]*event)

    for i := 0; i < 10; i++ {
        s.events[i] = &event{}
        c := make(chan bool)
        s.events[i].done = c

        go func(i int, c chan bool) {
            time.Sleep(1 * time.Millisecond)
            <-c
            // server do something.
        }(i, c)
    }

    time.Sleep(1 * time.Second)

    for i := 0; i < 10; i++ {
        // event happen.
        s.events[i].done <- true
    }
}

在评论中,您询问如何处理您不知道事件数量的情况。解决方案将取决于情况,但这是我用来处理类似情况的一种方法(这看起来很复杂,但我认为使用地图并在

Mutex
中包围每个访问更容易遵循)。

package main

import (
    "sync"
    "time"
)

type event struct {
    done chan bool
}

type server struct {
    events map[int]*event
}

func main() {
    s := server{}
    s.events = make(map[int]*event)

    // Routine to trigger channels
    triggerChan := make(chan chan bool) // Send new triggers to this...
    eventChan := make(chan struct{})    // Close this when the event happens and go routines should continue
    go func() {
        var triggers []chan bool
        eventReceived := false
        for {
            select {
            case t, ok := <-triggerChan:
                if !ok { // You want some way for the goRoutine to shut down - in this case we wait on the closure of triggerChan
                    return
                }
                if eventReceived {
                    t <- true // The event has already happened so go routine can proceed immediately
                } else {
                    triggers = append(triggers, t)
                }
            case <-eventChan:
                for _, c := range triggers {
                    c <- true
                }
                eventReceived = true
                eventChan = nil // Don't want select to be triggered again...
            }
        }
    }()

    // Start up the event handlers...
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
        s.events[i] = &event{}
        c := make(chan bool)
        triggerChan <- c

        go func(i int, c chan bool) {
            time.Sleep(1 * time.Millisecond)
            <-c
            // server do something.
            wg.Done()
        }(i, c)
    }

    time.Sleep(1 * time.Second)

    // Event happened - release the go routines
    close(eventChan)
    wg.Wait()
    close(triggerChan)
}
© www.soinside.com 2019 - 2024. All rights reserved.