我有一个服务器来处理事件,这个服务器有一个
mutex lock
和一个events
表(地图结构)。当服务器收到一个新事件时,它会获取 lock
以防止数据竞争,将该事件存储在事件表中,并启动一个 goroutine 来监视该事件已完成。如果我使用 -race
标志运行程序,它将输出 data race
。
package main
import (
"sync"
"time"
)
type event struct {
done chan bool
}
type server struct {
mu sync.Mutex
events map[int]*event
}
func main() {
s := server{}
s.events = make(map[int]*event)
for i := 0; i < 10; i++ {
go func(i int) {
s.mu.Lock()
s.events[i] = &event{}
s.events[i].done = make(chan bool)
s.mu.Unlock()
go func() {
time.Sleep(1 * time.Millisecond)
<-s.events[i].done
// server do something.
}()
}(i)
}
time.Sleep(1 * time.Second)
for i := 0; i < 10; i++ {
// event happen.
s.events[i].done <- true
}
}
输出
==================
WARNING: DATA RACE
Read at 0x00c00010dd10 by goroutine 14:
runtime.mapaccess1_fast64()
c:/go/src/runtime/map_fast64.go:12 +0x0
main.main.func1.1()
C:/SimpleAsyncBFT/race/main.go:29 +0x7c
Previous write at 0x00c00010dd10 by goroutine 15:
runtime.mapassign_fast64()
c:/go/src/runtime/map_fast64.go:92 +0x0
main.main.func1()
C:/SimpleAsyncBFT/race/main.go:24 +0xbe
Goroutine 14 (running) created at:
main.main.func1()
C:/SimpleAsyncBFT/race/main.go:27 +0x1c6
Goroutine 15 (finished) created at:
main.main()
C:/SimpleAsyncBFT/race/main.go:22 +0xed
我知道在monitor goroutine中添加
lock
可以解决这个问题,但是会导致死锁! done
通道仅用于通知服务器事件已完成。如果渠道不适合这种情况,如何实现?
根据注释,您的代码尝试同时读取和写入地图,并且根据 go 1.6 发行说明:
如果一个 Goroutine 正在写入映射,则其他 Goroutine 不应同时读取或写入该映射
查看您的代码似乎没有必要这样做。您可以提前创建频道;创建后,您只需从
map
中读取,因此没有问题:
package main
import (
"sync"
"time"
)
type event struct {
done chan bool
}
type server struct {
mu sync.Mutex
events map[int]*event
}
func main() {
s := server{}
s.events = make(map[int]*event)
for i := 0; i < 10; i++ {
s.events[i] = &event{}
s.events[i].done = make(chan bool)
}
for i := 0; i < 10; i++ {
go func(i int) {
time.Sleep(1 * time.Millisecond)
<-s.events[i].done
// server do something.
}(i)
}
time.Sleep(1 * time.Second)
for i := 0; i < 10; i++ {
// event happen.
s.events[i].done <- true
}
}
或者不要在 go 例程中访问地图:
package main
import (
"sync"
"time"
)
type event struct {
done chan bool
}
type server struct {
mu sync.Mutex
events map[int]*event
}
func main() {
s := server{}
s.events = make(map[int]*event)
for i := 0; i < 10; i++ {
s.events[i] = &event{}
c := make(chan bool)
s.events[i].done = c
go func(i int, c chan bool) {
time.Sleep(1 * time.Millisecond)
<-c
// server do something.
}(i, c)
}
time.Sleep(1 * time.Second)
for i := 0; i < 10; i++ {
// event happen.
s.events[i].done <- true
}
}
在评论中,您询问如何处理您不知道事件数量的情况。解决方案将取决于情况,但这是我用来处理类似情况的一种方法(这看起来很复杂,但我认为使用地图并在
Mutex
中包围每个访问更容易遵循)。
package main
import (
"sync"
"time"
)
type event struct {
done chan bool
}
type server struct {
events map[int]*event
}
func main() {
s := server{}
s.events = make(map[int]*event)
// Routine to trigger channels
triggerChan := make(chan chan bool) // Send new triggers to this...
eventChan := make(chan struct{}) // Close this when the event happens and go routines should continue
go func() {
var triggers []chan bool
eventReceived := false
for {
select {
case t, ok := <-triggerChan:
if !ok { // You want some way for the goRoutine to shut down - in this case we wait on the closure of triggerChan
return
}
if eventReceived {
t <- true // The event has already happened so go routine can proceed immediately
} else {
triggers = append(triggers, t)
}
case <-eventChan:
for _, c := range triggers {
c <- true
}
eventReceived = true
eventChan = nil // Don't want select to be triggered again...
}
}
}()
// Start up the event handlers...
var wg sync.WaitGroup
wg.Add(10)
for i := 0; i < 10; i++ {
s.events[i] = &event{}
c := make(chan bool)
triggerChan <- c
go func(i int, c chan bool) {
time.Sleep(1 * time.Millisecond)
<-c
// server do something.
wg.Done()
}(i, c)
}
time.Sleep(1 * time.Second)
// Event happened - release the go routines
close(eventChan)
wg.Wait()
close(triggerChan)
}