我有这两个功能:
pointsQueue = make(chan *mongo.UpdateOneModel, 1000)
func UpdatePoints(username string, size int64) {
pointsDifference := -1 * size
update := bson.D{{"$inc", bson.D{{"pointsLeft", pointsDifference }}}}
updateOp := mongo.NewUpdateOneModel()
updateOp.SetFilter(bson.M{"user": username})
updateOp.SetUpdate(update)
pointsQueue <- updateOp
}
func updatePointsWorker() {
var ctx = context.Background()
ticker := time.NewTicker(dbBatchTimeout)
var bulkRequests []mongo.WriteModel
for {
select {
case req := <-pointsQueue:
bulkRequests = append(bulkRequests, req)
case <-ticker.C:
if len(bulkRequests) > 0 {
_, err := usersCollection.BulkWrite(ctx, bulkRequests)
if err != nil {
fmt.Println(err.Error())
}
bulkRequests = nil
}
}
}
UpdatePoints
可以在短时间内(秒)被调用数千次,不允许选择股票代码并清空bulkRequests数组。即使 UpdatePoints
不断被调用,我怎样才能确保这个案例被调用?最重要的是,没有请求会在通道中“丢失”,因为它总是被新的写入所取代
我尝试过缓冲通道,但没有成功。我尝试在
req := <-pointsQueue:
情况下设置一个阈值,如果已存储一定数量的请求,则调用bulkwrite,但通过这种方式,我会丢失位于通道底部的请求,因为它们前面总是有新的请求写道。
运行时应保证公平性,因此股票通道不应匮乏。然而,即使如此,您的算法仍取决于速度,因为来自
pointsQueue
通道的过多读取可能会将切片增长到不可接受的水平。因此,您应该使用计时器和切片长度的上限,以确保切片正确清空。
empty:=func() {
_, err := usersCollection.BulkWrite(ctx, bulkRequests)
if err != nil {
fmt.Println(err.Error())
}
bulkRequests = nil
}
for {
select {
case req := <-pointsQueue:
bulkRequests = append(bulkRequests, req)
if len(bulkRequests) > limit {
empty()
ticker.Reset(dbBatchTimeout)
}
case <-ticker.C:
if len(bulkRequests) > 0 {
empty()
}
}