limiter.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package limiter
  4. import (
  5. "fmt"
  6. "html"
  7. "io"
  8. "sync"
  9. "time"
  10. "tailscale.com/util/lru"
  11. )
  12. // Limiter is a keyed token bucket rate limiter.
  13. //
  14. // Each key gets its own separate token bucket to pull from, enabling
  15. // enforcement on things like "requests per IP address". To avoid
  16. // unbounded memory growth, Limiter actually only tracks limits
  17. // precisely for the N most recently seen keys, and assumes that
  18. // untracked keys are well-behaved. This trades off absolute precision
  19. // for bounded memory use, while still enforcing well for outlier
  20. // keys.
  21. //
  22. // As such, Limiter should only be used in situations where "rough"
  23. // enforcement of outliers only is sufficient, such as throttling
  24. // egregious outlier keys (e.g. something sending 100 queries per
  25. // second, where everyone else is sending at most 5).
  26. //
  27. // Each key's token bucket behaves like a regular token bucket, with
  28. // the added feature that a bucket's token count can optionally go
  29. // negative. This implements a form of "cooldown" for keys that exceed
  30. // the rate limit: once a key starts getting denied, it must stop
  31. // requesting tokens long enough for the bucket to return to a
  32. // positive balance. If the key keeps hammering the limiter in excess
  33. // of the rate limit, the token count will remain negative, and the
  34. // key will not be allowed to proceed at all. This is in contrast to
  35. // the classic token bucket, where a key trying to use more than the
  36. // rate limit will get capped at the limit, but can still occasionally
  37. // consume a token as one becomes available.
  38. //
  39. // The zero value is a valid limiter that rejects all requests. A
  40. // useful limiter must specify a Size, Max and RefillInterval.
  41. type Limiter[K comparable] struct {
  42. // Size is the number of keys to track. Only the Size most
  43. // recently seen keys have their limits enforced precisely, older
  44. // keys are assumed to not be querying frequently enough to bother
  45. // tracking.
  46. Size int
  47. // Max is the number of tokens available for a key to consume
  48. // before time-based rate limiting kicks in. An unused limiter
  49. // regains available tokens over time, up to Max tokens. A newly
  50. // tracked key initially receives Max tokens.
  51. Max int64
  52. // RefillInterval is the interval at which a key regains tokens for
  53. // use, up to Max tokens.
  54. RefillInterval time.Duration
  55. // Overdraft is the amount of additional tokens a key can be
  56. // charged for when it exceeds its rate limit. Each additional
  57. // request issued for the key charges one unit of overdraft, up to
  58. // this limit. Overdraft tokens are refilled at the normal rate,
  59. // and must be fully repaid before any tokens become available for
  60. // requests.
  61. //
  62. // A non-zero Overdraft results in "cooldown" behavior: with a
  63. // normal token bucket that bottoms out at zero tokens, an abusive
  64. // key can still consume one token every RefillInterval. With a
  65. // non-zero overdraft, a throttled key must stop requesting tokens
  66. // entirely for a cooldown period, otherwise they remain
  67. // perpetually in debt and cannot proceed at all.
  68. Overdraft int64
  69. mu sync.Mutex
  70. cache *lru.Cache[K, *bucket]
  71. }
  72. // QPSInterval returns the interval between events corresponding to
  73. // the given queries/second rate.
  74. //
  75. // This is a helper to be used when populating Limiter.RefillInterval.
  76. func QPSInterval(qps float64) time.Duration {
  77. return time.Duration(float64(time.Second) / qps)
  78. }
  79. type bucket struct {
  80. cur int64 // current available tokens
  81. lastUpdate time.Time // last timestamp at which cur was updated
  82. }
  83. // Allow charges the key one token (up to the overdraft limit), and
  84. // reports whether the key can perform an action.
  85. func (l *Limiter[K]) Allow(key K) bool {
  86. return l.allow(key, time.Now())
  87. }
  88. func (l *Limiter[K]) allow(key K, now time.Time) bool {
  89. l.mu.Lock()
  90. defer l.mu.Unlock()
  91. return l.allowBucketLocked(l.getBucketLocked(key, now), now)
  92. }
  93. func (l *Limiter[K]) getBucketLocked(key K, now time.Time) *bucket {
  94. if l.cache == nil {
  95. l.cache = &lru.Cache[K, *bucket]{MaxEntries: l.Size}
  96. } else if b := l.cache.Get(key); b != nil {
  97. return b
  98. }
  99. b := &bucket{
  100. cur: l.Max,
  101. lastUpdate: now.Truncate(l.RefillInterval),
  102. }
  103. l.cache.Set(key, b)
  104. return b
  105. }
  106. func (l *Limiter[K]) allowBucketLocked(b *bucket, now time.Time) bool {
  107. // Only update the bucket quota if needed to process request.
  108. if b.cur <= 0 {
  109. l.updateBucketLocked(b, now)
  110. }
  111. ret := b.cur > 0
  112. if b.cur > -l.Overdraft {
  113. b.cur--
  114. }
  115. return ret
  116. }
  117. func (l *Limiter[K]) updateBucketLocked(b *bucket, now time.Time) {
  118. now = now.Truncate(l.RefillInterval)
  119. if now.Before(b.lastUpdate) {
  120. return
  121. }
  122. timeDelta := max(now.Sub(b.lastUpdate), 0)
  123. tokenDelta := int64(timeDelta / l.RefillInterval)
  124. b.cur = min(b.cur+tokenDelta, l.Max)
  125. b.lastUpdate = now
  126. }
  127. // peekForTest returns the number of tokens for key, also reporting
  128. // whether key was present.
  129. func (l *Limiter[K]) tokensForTest(key K) (int64, bool) {
  130. l.mu.Lock()
  131. defer l.mu.Unlock()
  132. if b, ok := l.cache.PeekOk(key); ok {
  133. return b.cur, true
  134. }
  135. return 0, false
  136. }
  137. // DumpHTML writes the state of the limiter to the given writer,
  138. // formatted as an HTML table. If onlyLimited is true, the output only
  139. // lists keys that are currently being limited.
  140. //
  141. // DumpHTML blocks other callers of the limiter while it collects the
  142. // state for dumping. It should not be called on large limiters
  143. // involved in hot codepaths.
  144. func (l *Limiter[K]) DumpHTML(w io.Writer, onlyLimited bool) {
  145. l.dumpHTML(w, onlyLimited, time.Now())
  146. }
  147. func (l *Limiter[K]) dumpHTML(w io.Writer, onlyLimited bool, now time.Time) {
  148. dump := l.collectDump(now)
  149. io.WriteString(w, "<table><tr><th>Key</th><th>Tokens</th></tr>")
  150. for _, line := range dump {
  151. if onlyLimited && line.Tokens > 0 {
  152. continue
  153. }
  154. kStr := html.EscapeString(fmt.Sprint(line.Key))
  155. format := "<tr><td>%s</td><td>%d</td></tr>"
  156. if !onlyLimited && line.Tokens <= 0 {
  157. // Make limited entries stand out when showing
  158. // limited+non-limited together
  159. format = "<tr><td>%s</td><td><b>%d</b></td></tr>"
  160. }
  161. fmt.Fprintf(w, format, kStr, line.Tokens)
  162. }
  163. io.WriteString(w, "</table>")
  164. }
  165. // collectDump grabs a copy of the limiter state needed by DumpHTML.
  166. func (l *Limiter[K]) collectDump(now time.Time) []dumpEntry[K] {
  167. l.mu.Lock()
  168. defer l.mu.Unlock()
  169. ret := make([]dumpEntry[K], 0, l.cache.Len())
  170. l.cache.ForEach(func(k K, v *bucket) {
  171. l.updateBucketLocked(v, now) // so stats are accurate
  172. ret = append(ret, dumpEntry[K]{k, v.cur})
  173. })
  174. return ret
  175. }
  176. // dumpEntry is the per-key information that DumpHTML needs to print
  177. // limiter state.
  178. type dumpEntry[K comparable] struct {
  179. Key K
  180. Tokens int64
  181. }