GoLang,确保通道读取不会超过选择中的任何其他情况

问题描述 投票:0回答:1

我目前有这两个功能

pointsQueue   = make(chan *mongo.UpdateOneModel, 1000)

func UpdatePoints(username string, size int64) {
    pointsDifference := -1 * size
    update := bson.D{{"$inc", bson.D{{"pointsLeft", pointsDifference }}}}
    updateOp := mongo.NewUpdateOneModel()
    updateOp.SetFilter(bson.M{"user": username})
    updateOp.SetUpdate(update)
    pointsQueue <- updateOp
}

func updatePointsWorker() {
    var ctx = context.Background()
    ticker := time.NewTicker(dbBatchTimeout)
    var bulkRequests []mongo.WriteModel

    for {
        select {
        case req := <-pointsQueue:
            bulkRequests = append(bulkRequests, req)
        case <-ticker.C:
            if len(bulkRequests) > 0 {
                _, err := usersCollection.BulkWrite(ctx, bulkRequests)
                if err != nil {
                   fmt.Println(err.Error())
                }
                bulkRequests = nil
            }
    }
}

UpdatePoints
可以在短时间内(秒)被调用数千次,不允许选择股票代码案例并清空bulkRequests数组,我如何确保即使
UpdatePoints
被调用,这种情况也被调用不断被召唤?最重要的是,没有请求会在通道中“丢失”,因为它总是被新的写入所取代

我尝试缓冲通道,但它不起作用,我尝试在

req := <-pointsQueue:
情况下设置一个阈值,如果已存储一定数量的请求,则调用bulkwrite,但这样我会丢失请求坐在通道的底部,因为它们之前总是有新的写入

go concurrency channel goroutine
1个回答
0
投票

运行时应保证公平性,因此股票通道不应匮乏。然而,即使如此,您的算法仍取决于速度,因为来自

pointsQueue
通道的过多读取可能会将切片增长到不可接受的水平。因此,您应该使用计时器和切片长度的上限,以确保切片正确清空。

empty:=func() {
  _, err := usersCollection.BulkWrite(ctx, bulkRequests)
  if err != nil {
     fmt.Println(err.Error())
  }
  bulkRequests = nil
}

for {
        select {
        case req := <-pointsQueue:
            bulkRequests = append(bulkRequests, req)
            if len(bulkRequests) > limit {
               empty()
               ticker.Reset(dbBatchTimeout)
            }
        case <-ticker.C:
            if len(bulkRequests) > 0 {
                empty()
            }
    }
© www.soinside.com 2019 - 2024. All rights reserved.