go 例程同步失败

问题描述 投票:0回答:0

我试图在 golang 中实现一个速率限制器。我使用的算法是滑动窗口。

方法

我正在使用地图来存储特定秒内的请求数。因此,当一个新请求到来时,它将计算 prevois 持续时间内的请求数量,如果超过限制,它将不允许请求被服务。

我的主界面是

type RateLimiterSync interface {

    AllowRequest(userID string) (bool, error)
    Kill() error
}

这里AllowRequest是我将调用的主要方法,根据给定时间范围内允许的请求数量的参数集,验证是否允许请求。

第二个方法 Kill() 是杀死一个 goroutine,它将继续从地图中删除在给定时间范围内不相关的条目。

此外,我将速率限制器的配置密钥明智地存储在另一个地图中

type Config struct {
    //Duration of timeline
    Duration int
    //no of request allowed
    Count int
}

这里 Duration 是评估请求数量的时间范围,Count 是该时间范围内允许的请求数量 因此,例如,如果 Duration 为 10 且计数为 10,则它将允许在 10 秒内处理 10 个请求

然后有一个类型 SlidingWindowSyncRateLimiterSync

提供实现

为实现添加代码

type SlidingWindowSync struct {
    mutex            sync.Mutex
    keyReqMap        map[string]map[int]int
    keyConfigService *KeyConfigService
    stop             bool
}

func NewSlidingWindowSync(keyConfigService *KeyConfigService, userID string) *SlidingWindowSync {
    var rateLimiter *SlidingWindowSync
    rateLimiter = &SlidingWindowSync{
        keyConfigService: keyConfigService,
        mutex:            sync.Mutex{},
        keyReqMap:        make(map[string]map[int]int),
        stop:             false,
    }
    userConfig, _ := rateLimiter.keyConfigService.GetConfig(userID)
    go rateLimiter.removeEnteries(userID, userConfig.Duration, rateLimiter.mutex)
    return rateLimiter
}

//incrementCountSync : increment req count we can add mutex here
func incrementCountSync(reqMap map[int]int, curTimeInSec int, mutex sync.Mutex) {
    mutex.Lock()
    defer mutex.Unlock()
    if _, exists := reqMap[curTimeInSec]; exists == false {
        reqMap[curTimeInSec] = 1
    } else {
        reqMap[curTimeInSec] = reqMap[curTimeInSec] + 1
    }
}

func (r *SlidingWindowSync) Kill() error {
    var err error
    if r.stop == false {
        r.stop = true
        fmt.Println(" killing go routine")
    } else {
        err = fmt.Errorf(" go routine already stopped")
    }
    return err
}

func (r *SlidingWindowSync) removeEnteries(configKey string, durationInSec int, mutex sync.Mutex) {
    for {
        if r.stop {
            fmt.Println(" existing the go routine ------")
            break
        }
        fmt.Println(" taking a lock in go routine")
        mutex.Lock()
        for key, _ := range r.keyReqMap[configKey] {
            if time.Now().UTC().Second()-key > durationInSec {
                fmt.Println(" deleting key for second =", time.Now().UTC().Second()-key)
                delete(r.keyReqMap[configKey], key)
            }
        }
        mutex.Unlock()
        fmt.Println(" existing a lock in go routine")
        time.Sleep(time.Second * time.Duration(durationInSec) / 10)
    }

}
func getReqCountSync(reqMap map[int]int, curTimeInSec int,
    durationInSec int, mutex sync.Mutex) int {
    var count int
    if len(reqMap) == 0 {
        return 0
    }
    mutex.Lock()
    defer mutex.Unlock()
    //Applying a lock over the critical section of the map
    for key, value := range reqMap {
        if curTimeInSec-key < durationInSec {
            count += value
        }
    }
    return count
}

//AllowRequest : this is the main impl for the rate limiter
func (s *SlidingWindowSync) AllowRequest(userID string) (bool, error) {
    var (
        reqMap     map[int]int
        userConfig *domains.Config
        err        error
        curTime    = time.Now().UTC().Second()
    )
    //Get the config service
    if userConfig, err = s.keyConfigService.GetConfig(userID); err != nil {
        return false, err
    }
    //Check in map for req count
    if _, exists := s.keyReqMap[userID]; !exists {
        s.keyReqMap[userID] = make(map[int]int)
    }
    reqMap = s.keyReqMap[userID]
    if getReqCountSync(reqMap, curTime, userConfig.Duration, s.mutex) >= userConfig.Count {
        return false, nil
    }
    incrementCountSync(reqMap, curTime, s.mutex)
    return true, nil
}

所以 AllowRequest 主要是提取密钥的配置,然后检查时间范围内的请求数量是否有效? 此外,每次初始化 NewSlidingWindowSync 时,我都会运行一个单独的 goroutine,这将继续从地图中删除与时间范围无关的条目。 添加主要实现

func main() {
    keyConfigService := services.NewKeyConfigService()
    keyConfigService.Upsert("login", domains.NewConfig(10, 10))
    rateLimiter := services.NewSlidingWindowSync(keyConfigService, "login")
    startTime := time.Now().UTC().Second()
    for index := 0; index < 40; index++ {
        if allow, err := rateLimiter.AllowRequest("login"); err != nil {
            fmt.Errorf(" error = %+v", err)
        } else if allow {
            fmt.Printf(" request allowed at %d second \n",
                time.Now().UTC().Second()-startTime)
        } else {
            fmt.Printf(" request aborted at %d second \n",
                time.Now().UTC().Second()-startTime)
        }
        time.Sleep(time.Millisecond * 500)
    }

}

这里我通过每次延迟 0.5 秒调用 go 例程来测试实现。

虽然主要是我在打印语句中得到了一些时间的负值,但我觉得这是因为同步问题。 为上面的代码添加 main 的输出

request allowed at 0 second 
 taking a lock in go routine
 existing a lock in go routine
 request allowed at 0 second 
 taking a lock in go routine
 existing a lock in go routine
 request allowed at 1 second 
 request allowed at 1 second 
 taking a lock in go routine
 existing a lock in go routine
 request allowed at 2 second 
 request allowed at 2 second 
 taking a lock in go routine
 existing a lock in go routine
 request allowed at 3 second 
 request allowed at 3 second 
 taking a lock in go routine
 existing a lock in go routine
 request allowed at 4 second 
 request allowed at 4 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at 5 second 
 request aborted at 5 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at 6 second 
 request aborted at 6 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at 7 second 
 request aborted at 7 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at 8 second 
 request aborted at 8 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at 9 second 
 request aborted at 9 second 
 taking a lock in go routine
 existing a lock in go routine
 request allowed at 10 second 
 request allowed at 10 second 
 taking a lock in go routine
 deleting key for second = 11
 existing a lock in go routine
 request allowed at 11 second 
 request allowed at 11 second 
 taking a lock in go routine
 deleting key for second = 11
 existing a lock in go routine
 request allowed at 12 second 
 request allowed at 12 second 
 taking a lock in go routine
 deleting key for second = 11
 existing a lock in go routine
 request allowed at 13 second 
 request allowed at 13 second 
 taking a lock in go routine
 deleting key for second = 11
 existing a lock in go routine
 request allowed at 14 second 
 request allowed at 14 second 
 taking a lock in go routine
 deleting key for second = 11
 existing a lock in go routine
 request aborted at 15 second 
 request aborted at 15 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at -44 second 
 request aborted at -44 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at -43 second 
 request aborted at -43 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at -42 second 
 request aborted at -42 second 
 taking a lock in go routine
 existing a lock in go routine
 request aborted at -41 second 
 request aborted at -41 second 
 taking a lock in go routine
 existing a lock in go routine

需要帮助为什么我有时会得到负值,因为 startTime 总是会更少?谢谢帮助

go concurrency mutex goroutine
© www.soinside.com 2019 - 2024. All rights reserved.