这是基于依赖图的事务执行模拟。我使用
goroutines
尝试并行执行。依赖图记录了事务的执行之间是否存在数据依赖关系。没有依赖关系的事务可以并行执行。但是我遇到了一个问题,我发现goroutine中修改monitorResult
和duplicateG
不会成功,导致无限循环而不改变duplicateG
的大小。如何解决这个问题?
ackage peers
import (
"fmt"
"sync"
"time"
)
type Asset struct {
AppraisedValue int
ID int
version int
}
type TxReadWriteSet struct {
Reads KVRead
Writes KVWrite
}
// read set
type KVRead struct {
Key int
}
// write set
type KVWriteType int
const (
SET KVWriteType = iota
DEL
INC
)
// KVWrite
type KVWrite struct {
Key int
Type KVWriteType
Value int
}
type singleTransaction struct {
TransactionID int
TxReadWriteSet TxReadWriteSet
TransactionTime int32
}
type Graph struct {
edges map[int][]int
nodes map[int]bool
}
func SimulateWithGraph(transactions []singleTransaction, graph Graph) ([]Asset, []singleTransaction, []singleTransaction) {
var successfulTransactions []singleTransaction
var errorTransactions []singleTransaction
monitorResult := GetWorldState()
duplicateG := graph
// Create a wait group to wait for all goroutines to complete
var wg sync.WaitGroup
var mutex sync.Mutex
// Creates a channel for receiving successful and erroneous trades
successfulCh := make(chan singleTransaction)
errorCh := make(chan singleTransaction)
// Transactions are executed in topologically sorted order
for len(duplicateG.nodes) > 0 {
// Gets a node with an entry degree of 0
queue := getZeroInDegreeNodes(graph)
// Execute transactions for nodes with a degree of 0
txns := getTransaction(queue, transactions)
for _, txn := range txns {
wg.Add(1)
go func(txn singleTransaction) {
read := txn.TxReadWriteSet.Reads
startVersion := monitorResult[read.Key].version
// Check the write operation
mutex.Lock()
write := txn.TxReadWriteSet.Writes
if write.Type == DEL && monitorResult[write.Key].AppraisedValue < write.Value {
errorCh <- txn
return
}
switch write.Type {
case INC:
monitorResult[write.Key].AppraisedValue += write.Value
case DEL:
monitorResult[write.Key].AppraisedValue -= write.Value
case SET:
monitorResult[write.Key].AppraisedValue = write.Value
}
//Check that the version numbers are consistent before committing
if monitorResult[read.Key].version != startVersion {
mutex.Unlock()
errorCh <- txn
return
}
monitorResult[write.Key].version++
mutex.Unlock()
successfulCh <- txn
// Deletes the executed node
delete(duplicateG.nodes, txn.TransactionID)
delete(duplicateG.edges, txn.TransactionID)
wg.Done()
}(txn)
}
}
wg.Wait()
close(successfulCh)
close(errorCh)
// Collect successful and erroneous trades
for txn := range successfulCh {
successfulTransactions = append(successfulTransactions, txn)
}
for txn := range errorCh {
errorTransactions = append(errorTransactions, txn)
}
return monitorResult, successfulTransactions, errorTransactions
}
依赖图中入度为 0 的节点应该修改不同的资产值,所以我想知道是否应该添加互斥体以及如何做到这一点
不要从goroutine开始,先写同步代码。如果可行,我们可以尝试使其并发 - 这会是有益的。
你愿意
read := txn.TxReadWriteSet.Reads
startVersion := monitorResult[read.Key].version
和
if monitorResult[read.Key].version != startVersion {
// error
return
}
monitorResult[write.Key].version++
这意味着如果 goroutine 具有相同的读取键,它们就会失败 - 这可能应该在之前被过滤。
您提到“没有依赖关系的事务可以并行执行” - 我在哪里看到这一点?另外,事务有什么作用?如果它很便宜,并行化可能会减慢速度,如果只是受 CPU 限制而启动比 CPU 核心数更多的 goroutine,则不会给您的程序带来好处。
通常,您不应该将同步原语与业务逻辑如此频繁地混合 - 尝试保持所有内容同步,并查看稍后要并行化的点,保持大多数业务逻辑同步。