我目前有这两个功能
pointsQueue = make(chan *mongo.UpdateOneModel, 1000)
func UpdatePoints(username string, size int64) {
pointsDifference := -1 * size
update := bson.D{{"$inc", bson.D{{"pointsLeft", pointsDifference }}}}
updateOp := mongo.NewUpdateOneModel()
updateOp.SetFilter(bson.M{"user": username})
updateOp.SetUpdate(update)
pointsQueue <- updateOp
}
func updatePointsWorker() {
var ctx = context.Background()
ticker := time.NewTicker(dbBatchTimeout)
var bulkRequests []mongo.WriteModel
for {
select {
case req := <-pointsQueue:
bulkRequests = append(bulkRequests, req)
case <-ticker.C:
if len(bulkRequests) > 0 {
_, err := usersCollection.BulkWrite(ctx, bulkRequests)
if err != nil {
fmt.Println(err.Error())
}
bulkRequests = nil
}
}
}
UpdatePoints
可以在短时间内(秒)被调用数千次,不允许选择股票代码案例并清空bulkRequests数组,我如何确保即使UpdatePoints
被调用,这种情况也被调用不断被召唤?最重要的是,没有请求会在通道中“丢失”,因为它总是被新的写入所取代
我尝试缓冲通道,但它不起作用,我尝试在
req := <-pointsQueue:
情况下设置一个阈值,如果已存储一定数量的请求,则调用bulkwrite,但这样我会丢失请求坐在通道的底部,因为它们之前总是有新的写入
运行时应保证公平性,因此股票通道不应匮乏。然而,即使如此,您的算法仍取决于速度,因为来自
pointsQueue
通道的过多读取可能会将切片增长到不可接受的水平。因此,您应该使用计时器和切片长度的上限,以确保切片正确清空。
empty:=func() {
_, err := usersCollection.BulkWrite(ctx, bulkRequests)
if err != nil {
fmt.Println(err.Error())
}
bulkRequests = nil
}
for {
select {
case req := <-pointsQueue:
bulkRequests = append(bulkRequests, req)
if len(bulkRequests) > limit {
empty()
ticker.Reset(dbBatchTimeout)
}
case <-ticker.C:
if len(bulkRequests) > 0 {
empty()
}
}