NOTE:这道题和其他并发题不一样,不仅writer是单一的,而且write操作也是strictly once.
当有多个并发读取器和一个只写入一次且仅写入一次的写入器时,Go 中使用什么同步方法,例如并发环境中的设置器?
sync.Mutex
适用于这种情况,但是,由于只有一个作者,sync.RWMutex
甚至更好,因为它比常规互斥体快一点。
仍然,在应用程序运行期间进行互斥锁定以简单地设置一次值感觉很浪费。
有没有更快的方法?
package main
import (
"sync"
)
type RWMutexErrorNotifier struct {
rwMutex sync.RWMutex
emailSender func(string)
}
func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.rwMutex.Lock()
defer a.rwMutex.Unlock()
a.emailSender = emailSender
}
func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
a.rwMutex.RLock()
defer a.rwMutex.RUnlock()
if a.emailSender == nil {
return
}
a.emailSender(errorMessage)
}
func main() {
sender := &RWMutexErrorNotifier{}
errorsCount := 100_000
emailSender := func(emailMessage string) {
// sending email...
}
var wg sync.WaitGroup // used only for demo purposes
wg.Add(errorsCount)
for i := 0; i < errorsCount; i++ {
go func() {
sender.SendErrorMessage("ALARM!")
wg.Done()
}()
}
sender.SetEmailSenderService(emailSender) // making a single write
wg.Wait()
}
更新感谢@peter-cordes - 请参阅答案下方的评论
显然,读取和设置
atomic.Bool
比 sync.RWMutex
对于单个作者的单次写入更快。它只比无锁的解决方案慢一点(仅用于基准测试)
基准测试结果(也提供基于
sync.Mutex
的解决方案,仅用于基准测试目的):
$ go test -run=XXX -bench=. -benchmem -benchtime=1000000x
goos: darwin
goarch: arm64
pkg: untitled1
BenchmarkRacyNoGoodConcurrently-12 1000000 3529 ns/op 384 B/op 12 allocs/op
BenchmarkAtomicBooleanConcurrently-12 1000000 3494 ns/op 384 B/op 12 allocs/op
BenchmarkRWMutexConcurrently-12 1000000 3909 ns/op 384 B/op 12 allocs/op
BenchmarkMutexConcurrently-12 1000000 4180 ns/op 384 B/op 12 allocs/op
BenchmarkRacyNoGoodSequentiallyAllCores-12 1000000 3.661 ns/op 0 B/op 0 allocs/op
BenchmarkAtomicBooleanSequentiallyAllCores-12 1000000 3.748 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexSequentiallyAllCores-12 1000000 1934 ns/op 0 B/op 0 allocs/op
BenchmarkMutexSequentiallyAllCores-12 1000000 1486 ns/op 0 B/op 0 allocs/op
BenchmarkRacyNoGoodSequentiallySingleCore-12 1000000 28.95 ns/op 0 B/op 0 allocs/op
BenchmarkAtomicBooleanSequentiallySingleCore-12 1000000 29.54 ns/op 0 B/op 0 allocs/op
BenchmarkRWMutexSequentiallySingleCore-12 1000000 188.6 ns/op 0 B/op 0 allocs/op
BenchmarkMutexSequentiallySingleCore-12 1000000 187.4 ns/op 0 B/op 0 allocs/op
PASS
ok untitled1 19.093s
package main
import (
"runtime"
"sync"
"sync/atomic"
)
type ErrorNotifier interface {
SetEmailSenderService(func(string))
SendErrorMessage(string)
}
// Mutex
type MutexErrorNotifier struct {
mutex sync.Mutex
emailSender func(string)
}
var _ ErrorNotifier = (*MutexErrorNotifier)(nil)
func (a *MutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.mutex.Lock()
defer a.mutex.Unlock()
a.emailSender = emailSender
}
func (a *MutexErrorNotifier) SendErrorMessage(errorMessage string) {
a.mutex.Lock()
defer a.mutex.Unlock()
if a.emailSender != nil {
a.emailSender(errorMessage)
}
}
// RWMutex
type RWMutexErrorNotifier struct {
rwMutex sync.RWMutex
emailSender func(string)
}
var _ ErrorNotifier = (*RWMutexErrorNotifier)(nil)
func (a *RWMutexErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.rwMutex.Lock()
defer a.rwMutex.Unlock()
a.emailSender = emailSender
}
func (a *RWMutexErrorNotifier) SendErrorMessage(errorMessage string) {
a.rwMutex.RLock()
defer a.rwMutex.RUnlock()
if a.emailSender != nil {
a.emailSender(errorMessage)
}
}
// Atomic Boolean
type AtomicBooleanErrorNotifier struct {
emailerIsSet atomic.Bool
emailSender func(string)
}
var _ ErrorNotifier = (*AtomicBooleanErrorNotifier)(nil)
func (a *AtomicBooleanErrorNotifier) SetEmailSenderService(emailSender func(string)) {
defer a.emailerIsSet.Store(true)
a.emailSender = emailSender
}
func (a *AtomicBooleanErrorNotifier) SendErrorMessage(errorMessage string) {
if a.emailerIsSet.Load() {
a.emailSender(errorMessage)
}
}
// NOT A SOLUTION: racy no locking solution - just for benchmarking
type RacyNoGoodErrorNotifier struct {
emailSender func(string)
}
var _ ErrorNotifier = (*RacyNoGoodErrorNotifier)(nil)
func (a *RacyNoGoodErrorNotifier) SetEmailSenderService(emailSender func(string)) {
a.emailSender = emailSender
}
func (a *RacyNoGoodErrorNotifier) SendErrorMessage(errorMessage string) {
if a.emailSender != nil {
a.emailSender(errorMessage)
}
}
// Demo run
const allConcurrent = "all concurrent"
const sequentialSingleCore = "sequential single core"
const sequentialAllCores = "sequential all cores"
func Run(n int, runner ErrorNotifier, runType string) {
emailSender := func(emailMessage string) {
// sending email...
}
var wg sync.WaitGroup
switch runType {
case allConcurrent:
wg.Add(n * runtime.NumCPU())
for i := 0; i < n*runtime.NumCPU(); i++ {
go func() {
runner.SendErrorMessage("ALARM!")
wg.Done()
}()
}
case sequentialAllCores:
wg.Add(runtime.NumCPU())
for i := 0; i < runtime.NumCPU(); i++ {
go func() {
for j := 0; j < n; j++ {
runner.SendErrorMessage("ALARM!")
}
wg.Done()
}()
}
case sequentialSingleCore:
wg.Add(1)
go func() {
for j := 0; j < n*runtime.NumCPU(); j++ {
runner.SendErrorMessage("ALARM!")
}
wg.Done()
}()
default:
panic("unknown mode")
}
runner.SetEmailSenderService(emailSender)
wg.Wait()
}
基准:
package main
import "testing"
func BenchmarkRacyNoGoodConcurrently(b *testing.B) {
Run(b.N, &RacyNoGoodErrorNotifier{}, allConcurrent)
}
func BenchmarkAtomicBooleanConcurrently(b *testing.B) {
Run(b.N, &AtomicBooleanErrorNotifier{}, allConcurrent)
}
func BenchmarkRWMutexConcurrently(b *testing.B) {
Run(b.N, &RWMutexErrorNotifier{}, allConcurrent)
}
func BenchmarkMutexConcurrently(b *testing.B) {
Run(b.N, &MutexErrorNotifier{}, allConcurrent)
}
func BenchmarkRacyNoGoodSequentiallyAllCores(b *testing.B) {
Run(b.N, &RacyNoGoodErrorNotifier{}, sequentialAllCores)
}
func BenchmarkAtomicBooleanSequentiallyAllCores(b *testing.B) {
Run(b.N, &AtomicBooleanErrorNotifier{}, sequentialAllCores)
}
func BenchmarkRWMutexSequentiallyAllCores(b *testing.B) {
Run(b.N, &RWMutexErrorNotifier{}, sequentialAllCores)
}
func BenchmarkMutexSequentiallyAllCores(b *testing.B) {
Run(b.N, &MutexErrorNotifier{}, sequentialAllCores)
}
func BenchmarkRacyNoGoodSequentiallySingleCore(b *testing.B) {
Run(b.N, &RacyNoGoodErrorNotifier{}, sequentialSingleCore)
}
func BenchmarkAtomicBooleanSequentiallySingleCore(b *testing.B) {
Run(b.N, &AtomicBooleanErrorNotifier{}, sequentialSingleCore)
}
func BenchmarkRWMutexSequentiallySingleCore(b *testing.B) {
Run(b.N, &RWMutexErrorNotifier{}, sequentialSingleCore)
}
func BenchmarkMutexSequentiallySingleCore(b *testing.B) {
Run(b.N, &MutexErrorNotifier{}, sequentialSingleCore)
}