如何确保通道读取不会超过选择中的任何其他情况

问题描述 投票:0回答:1

我有这两个功能:

pointsQueue = make(chan *mongo.UpdateOneModel, 1000)

func UpdatePoints(username string, size int64) {
    pointsDifference := -1 * size
    update := bson.D{{"$inc", bson.D{{"pointsLeft", pointsDifference }}}}
    updateOp := mongo.NewUpdateOneModel()
    updateOp.SetFilter(bson.M{"user": username})
    updateOp.SetUpdate(update)
    pointsQueue <- updateOp
}

func updatePointsWorker() {
    var ctx = context.Background()
    ticker := time.NewTicker(dbBatchTimeout)
    var bulkRequests []mongo.WriteModel

    for {
        select {
        case req := <-pointsQueue:
            bulkRequests = append(bulkRequests, req)
        case <-ticker.C:
            if len(bulkRequests) > 0 {
                _, err := usersCollection.BulkWrite(ctx, bulkRequests)
                if err != nil {
                   fmt.Println(err.Error())
                }
                bulkRequests = nil
            }
    }
}

UpdatePoints
可以在短时间内(秒)被调用数千次,不允许选择股票代码并清空bulkRequests数组。即使
UpdatePoints
不断被调用,我怎样才能确保这个案例被调用?最重要的是,没有请求会在通道中“丢失”,因为它总是被新的写入所取代

我尝试过缓冲通道,但没有成功。我尝试在

req := <-pointsQueue:
情况下设置一个阈值,如果已存储一定数量的请求,则调用bulkwrite,但通过这种方式,我会丢失位于通道底部的请求,因为它们前面总是有新的请求写道。

go concurrency channel goroutine
1个回答
0
投票

运行时应保证公平性,因此股票通道不应匮乏。然而,即使如此,您的算法仍取决于速度,因为来自

pointsQueue
通道的过多读取可能会将切片增长到不可接受的水平。因此,您应该使用计时器和切片长度的上限,以确保切片正确清空。

empty:=func() {
  _, err := usersCollection.BulkWrite(ctx, bulkRequests)
  if err != nil {
     fmt.Println(err.Error())
  }
  bulkRequests = nil
}

for {
        select {
        case req := <-pointsQueue:
            bulkRequests = append(bulkRequests, req)
            if len(bulkRequests) > limit {
               empty()
               ticker.Reset(dbBatchTimeout)
            }
        case <-ticker.C:
            if len(bulkRequests) > 0 {
                empty()
            }
    }
© www.soinside.com 2019 - 2024. All rights reserved.