package ratelimiter import ( "fmt" "sync" "time" ) type AttributeMap map[string]Limiter type AttributeBasedLimiter struct { attributeMap AttributeMap m sync.Mutex } func (a *AttributeBasedLimiter) HasKey(key string) bool { a.m.Lock() _, ok := a.attributeMap[key] a.m.Unlock() return ok } func (a *AttributeBasedLimiter) CreateNewKey(key string, limit uint64, size time.Duration) error { a.m.Lock() defer a.m.Unlock() if _, ok := a.attributeMap[key]; ok { return fmt.Errorf( "key %s is already defined", key, ) } // create a new entry: a.attributeMap[key] = *NewLimiter(limit, size) return nil } func (a *AttributeBasedLimiter) ShouldAllow(key string, n uint64) (bool, error) { a.m.Lock() defer a.m.Unlock() limiter, ok := a.attributeMap[key] if ok { return limiter.ShouldAllow(n) } return false, fmt.Errorf("key %s not found", key) } func (a *AttributeBasedLimiter) DeleteKey(key string) error { a.m.Lock() defer a.m.Unlock() if limiter, ok := a.attributeMap[key]; ok { err := limiter.Kill() if err != nil { return err } delete(a.attributeMap, key) return nil } return fmt.Errorf("key %s not found", key) } func NewAttributeBasedLimiter() *AttributeBasedLimiter { return &AttributeBasedLimiter{ attributeMap: make(AttributeMap), } }
package ratelimiter import ( "context" "fmt" "sync" "time" ) type Limiter struct { previous *Window current *Window lock sync.Mutex size time.Duration limit uint64 killed bool windowContext context.Context cancelFn func() } func (l *Limiter) ShouldAllow(n uint64) (bool, error) { l.lock.Lock() defer l.lock.Unlock() if l.killed { return false, fmt.Errorf("function ShouldAllow called on an inactive instance") } currentTime := time.Now() currentWindowBoundary := currentTime.Sub(l.current.getStartTime()) w := float64(l.size-currentWindowBoundary) / float64(l.size) currentSlidingRequests := uint64(w*float64(l.previous.count)) + l.current.count if currentSlidingRequests+n > l.limit { return false, nil } // add current request count to window of current count l.current.updateCount(n) return true, nil } func (l *Limiter) progressiveWindowSlider() { for { select { case <-l.windowContext.Done(): return default: toSleepDuration := l.size - time.Since(l.current.getStartTime()) time.Sleep(toSleepDuration) l.lock.Lock() // make current as previous and create a new current window l.previous.setStateFrom(l.current) l.current.resetToTime(time.Now()) l.lock.Unlock() } } } func (l *Limiter) Kill() error { l.lock.Lock() defer l.lock.Unlock() if l.killed { return fmt.Errorf("called Kill on already killed limiter") } defer l.cancelFn() l.killed = true return nil } func NewLimiter(limit uint64, size time.Duration) *Limiter { previous := NewWindow(0, time.Now()) current := NewWindow(0, time.Now()) childCtx, cancelFn := context.WithCancel(context.Background()) limiter := &Limiter{ previous: previous, current: current, lock: sync.Mutex{}, size: size, limit: limit, killed: false, windowContext: childCtx, cancelFn: cancelFn, } go limiter.progressiveWindowSlider() return limiter }
package ratelimiter import ( "time" ) type Window struct { count uint64 startTime int64 } func (w *Window) updateCount(n uint64) { w.count += n } func (w *Window) getStartTime() time.Time { return time.Unix(0, w.startTime) } func (w *Window) setStateFrom(other *Window) { w.count = other.count w.startTime = other.startTime } func (w *Window) resetToTime(startTime time.Time) { nsTime := startTime.UnixNano() w.count = 0 w.startTime = nsTime } func NewWindow(count uint64, startTime time.Time) *Window { nsTime := startTime.UnixNano() return &Window{ count: count, startTime: int64(nsTime), } }