package ratelimiter import ( "fmt" "sync" "time" ) // AttributeMap is a custom map type of string key and Limiter instance as value type AttributeMap map[string]Limiter // AttributeBasedLimiter is an instance that can manage multiple rate limiter instances // with different configutations. type AttributeBasedLimiter struct { attributeMap AttributeMap m sync.Mutex syncMode bool } // HasKey check if AttributeBasedLimiter has a limiter for the key. // // Parameters: // // 1. key: a unique key string, example: IP address, token, uuid etc // // Returns a boolean flag, if true, the key is already present, false otherwise. func (a *AttributeBasedLimiter) HasKey(key string) bool { a.m.Lock() _, ok := a.attributeMap[key] a.m.Unlock() return ok } // CreateNewKey create a new key-limiter assiociation. // // Parameters: // // 1. key: a unique key string, example: IP address, token, uuid etc // // 2. limit: The number of tasks to be allowd // // 3. size: duration // // Returns error if the key already exists. func (a *AttributeBasedLimiter) CreateNewKey(key string, limit uint64, size time.Duration) error { a.m.Lock() defer a.m.Unlock() return a.createNewKey(key, limit, size) } func (a *AttributeBasedLimiter) createNewKey(key string, limit uint64, size time.Duration) error { if _, ok := a.attributeMap[key]; ok { return fmt.Errorf( "key %s is already defined", key, ) } // create a new entry: if !a.syncMode { a.attributeMap[key] = NewDefaultLimiter(limit, size) } else { a.attributeMap[key] = NewSyncLimiter(limit, size) } return nil } // HasOrCreateKey check if AttributeBasedLimiter has a limiter for the key. // Create a new key-limiter assiociation if the key not exists. // // Parameters: // // 1. key: a unique key string, example: IP address, token, uuid etc // // 2. limit: The number of tasks to be allowd // // 3. size: duration // // Return true if the key exists or is created successfully. func (a *AttributeBasedLimiter) HasOrCreateKey(key string, limit uint64, size time.Duration) bool { a.m.Lock() defer a.m.Unlock() if _, ok := a.attributeMap[key]; ok { return true } if err := a.createNewKey(key, limit, size); err == nil { return true } return false } // ShouldAllow makes decison whether n tasks can be allowed or not. // // Parameters: // // key: a unique key string, example: IP address, token, uuid etc // // n: number of tasks to be processed, set this as 1 for a single task. // (Example: An HTTP request) // // Returns (bool, error). // (false, error) when limiter is inactive (or it is killed) or key is not present. // (true/false, nil) if key exists and n tasks can be allowed or not. func (a *AttributeBasedLimiter) ShouldAllow(key string, n uint64) (bool, error) { a.m.Lock() defer a.m.Unlock() limiter, ok := a.attributeMap[key] if ok { return limiter.ShouldAllow(n) } return false, fmt.Errorf("key %s not found", key) } // MustShouldAllow makes decison whether n tasks can be allowed or not. // // Parameters: // // key: a unique key string, example: IP address, token, uuid etc // // n: number of tasks to be processed, set this as 1 for a single task. // (Example: An HTTP request) // // Returns bool. // (false) when limiter is inactive (or it is killed) or n tasks can be not allowed. // (true) when n tasks can be allowed or new key-limiter. func (a *AttributeBasedLimiter) MustShouldAllow(key string, n uint64, limit uint64, size time.Duration) bool { a.m.Lock() defer a.m.Unlock() if limiter, ok := a.attributeMap[key]; ok { allowed, err := limiter.ShouldAllow(n) return allowed && err == nil } err := a.createNewKey(key, limit, size) if err != nil { return err == nil } // check ratelimiter on newly created key: limiter := a.attributeMap[key] allowed, err := limiter.ShouldAllow(n) return allowed && err == nil } // DeleteKey remove the key and kill its underlying limiter. // // Parameters: // // 1.key: a unique key string, example: IP address, token, uuid etc // // Returns an error if the key is not present. func (a *AttributeBasedLimiter) DeleteKey(key string) error { a.m.Lock() defer a.m.Unlock() if limiter, ok := a.attributeMap[key]; ok { err := limiter.Kill() if err != nil { return err } delete(a.attributeMap, key) return nil } return fmt.Errorf("key %s not found", key) } // NewAttributeBasedLimiter creates an instance of AttributeBasedLimiter and returns it's pointer. // // Parameters: // // 1. backgroundSliding: if set to true, DefaultLimiter will be used as an underlying limiter, // else, SyncLimiter will be used. func NewAttributeBasedLimiter(backgroundSliding bool) *AttributeBasedLimiter { return &AttributeBasedLimiter{ attributeMap: make(AttributeMap), syncMode: !backgroundSliding, } }
package ratelimiter import ( "context" "fmt" "sync" "time" ) // Limiter is an interface that is implemented by DefaultLimiter and SyncLimiter type Limiter interface { Kill() error ShouldAllow(n uint64) (bool, error) } // DefaultLimiter maintains all the structures used for rate limting using a background goroutine. type DefaultLimiter struct { previous *Window current *Window lock sync.Mutex size time.Duration limit uint64 killed bool windowContext context.Context cancelFn func() } // ShouldAllow makes decison whether n tasks can be allowed or not. // // Parameters: // // 1. n: number of tasks to be processed, set this as 1 for a single task. (Example: An HTTP request) // // Returns (bool, error). (false, error) if limiter is inactive (or it is killed). Otherwise, // (true/false, nil) depending on whether n tasks can be allowed or not. func (l *DefaultLimiter) ShouldAllow(n uint64) (bool, error) { l.lock.Lock() defer l.lock.Unlock() if l.killed { return false, fmt.Errorf("function ShouldAllow called on an inactive instance") } if l.limit == 0 || l.size < time.Millisecond { return false, fmt.Errorf("invalid limiter configuration") } currentTime := time.Now() currentWindowBoundary := currentTime.Sub(l.current.getStartTime()) w := float64(l.size-currentWindowBoundary) / float64(l.size) currentSlidingRequests := uint64(w*float64(l.previous.count)) + l.current.count if currentSlidingRequests+n > l.limit { return false, nil } // add current request count to window of current count l.current.updateCount(n) return true, nil } func (l *DefaultLimiter) progressiveWindowSlider() { for { select { case <-l.windowContext.Done(): return default: toSleepDuration := l.size - time.Since(l.current.getStartTime()) time.Sleep(toSleepDuration) l.lock.Lock() // make current as previous and create a new current window l.previous.setStateFrom(l.current) l.current.resetToTime(time.Now()) l.lock.Unlock() } } } // Kill the limiter, returns error if the limiter has been killed already. func (l *DefaultLimiter) Kill() error { l.lock.Lock() defer l.lock.Unlock() if l.killed { return fmt.Errorf("called Kill on already killed limiter") } defer l.cancelFn() l.killed = true return nil } // NewDefaultLimiter creates an instance of DefaultLimiter and returns it's pointer. // // Parameters: // // 1. limit: The number of tasks to be allowd // // 2. size: duration func NewDefaultLimiter(limit uint64, size time.Duration) *DefaultLimiter { previous := NewWindow(0, time.Unix(0, 0)) current := NewWindow(0, time.Unix(0, 0)) childCtx, cancelFn := context.WithCancel(context.Background()) limiter := &DefaultLimiter{ previous: previous, current: current, lock: sync.Mutex{}, size: size, limit: limit, killed: false, windowContext: childCtx, cancelFn: cancelFn, } go limiter.progressiveWindowSlider() return limiter } // SyncLimiter maintains all the structures used for rate limting on demand. type SyncLimiter struct { previous *Window current *Window lock sync.Mutex size time.Duration limit uint64 killed bool } func (s *SyncLimiter) getNSlidesSince(now time.Time) (time.Duration, time.Time) { sizeAlignedTime := now.Truncate(s.size) timeSinceStart := sizeAlignedTime.Sub(s.current.getStartTime()) return timeSinceStart / s.size, sizeAlignedTime } // ShouldAllow makes decison whether n tasks can be allowed or not. // // Parameters: // // 1. n: number of tasks to be processed, set this as 1 for a single task. (Example: An HTTP request) // // Returns (bool, error). (false, error) if limiter is inactive (or it is killed). Otherwise, // (true/false, error) depending on whether n tasks can be allowed or not. func (s *SyncLimiter) ShouldAllow(n uint64) (bool, error) { s.lock.Lock() defer s.lock.Unlock() if s.killed { return false, fmt.Errorf("function ShouldAllow called on an inactive instance") } if s.limit == 0 || s.size < time.Millisecond { return false, fmt.Errorf("invalid limiter configuration") } currentTime := time.Now() // advance the window on demand, as this doesn't make use of goroutine. nSlides, alignedCurrentTime := s.getNSlidesSince(currentTime) // window slide shares both current and previous windows. if nSlides == 1 { s.previous.setToState( alignedCurrentTime.Add(-s.size), s.current.count, ) s.current.resetToTime( alignedCurrentTime, ) } else if nSlides > 1 { s.previous.resetToTime( alignedCurrentTime.Add(-s.size), ) s.current.resetToTime( alignedCurrentTime, ) } currentWindowBoundary := currentTime.Sub(s.current.getStartTime()) w := float64(s.size-currentWindowBoundary) / float64(s.size) currentSlidingRequests := uint64(w*float64(s.previous.count)) + s.current.count if currentSlidingRequests+n > s.limit { return false, nil } // add current request count to window of current count s.current.updateCount(n) return true, nil } // Kill the limiter, returns error if the limiter has been killed already. func (s *SyncLimiter) Kill() error { s.lock.Lock() defer s.lock.Unlock() if s.killed { return fmt.Errorf("called Kill on already killed limiter") } // kill is a dummy implementation for SyncLimiter, // because there is no need of stopping a go-routine. s.killed = true return nil } // NewSyncLimiter creates an instance of SyncLimiter and returns it's pointer. // // Parameters: // // 1. limit: The number of tasks to be allowd // // 2. size: duration func NewSyncLimiter(limit uint64, size time.Duration) *SyncLimiter { current := NewWindow(0, time.Unix(0, 0)) previous := NewWindow(0, time.Unix(0, 0)) return &SyncLimiter{ previous: previous, current: current, lock: sync.Mutex{}, killed: false, size: size, limit: limit, } }
package ratelimiter import ( "time" ) // Window represents the structure of timing-window at given point of time. type Window struct { count uint64 startTime time.Time } func (w *Window) updateCount(n uint64) { w.count += n } func (w *Window) getStartTime() time.Time { return w.startTime } func (w *Window) setStateFrom(other *Window) { w.count = other.count w.startTime = other.startTime } func (w *Window) resetToTime(startTime time.Time) { w.count = 0 w.startTime = startTime } func (w *Window) setToState(startTime time.Time, count uint64) { w.startTime = startTime w.count = count } // Creates and returns a pointer to the new Window instance. // // Parameters: // // 1. count: The initial count of the window. // // 2. startTime: The initial starting time of the window. func NewWindow(count uint64, startTime time.Time) *Window { return &Window{ count: count, startTime: startTime, } }