ratelimit.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. // Copyright 2014 Canonical Ltd.
  2. // Licensed under the LGPLv3 with static-linking exception.
  3. // See LICENCE file for details.
  4. // The ratelimit package provides an efficient token bucket implementation
  5. // that can be used to limit the rate of arbitrary things.
  6. // See http://en.wikipedia.org/wiki/Token_bucket.
  7. package ratelimit
  8. import (
  9. "math"
  10. "strconv"
  11. "sync"
  12. "time"
  13. )
  14. // Bucket represents a token bucket that fills at a predetermined rate.
  15. // Methods on Bucket may be called concurrently.
  16. type Bucket struct {
  17. startTime time.Time
  18. capacity int64
  19. quantum int64
  20. fillInterval time.Duration
  21. // The mutex guards the fields following it.
  22. mu sync.Mutex
  23. // avail holds the number of available tokens
  24. // in the bucket, as of availTick ticks from startTime.
  25. // It will be negative when there are consumers
  26. // waiting for tokens.
  27. avail int64
  28. availTick int64
  29. }
  30. // NewBucket returns a new token bucket that fills at the
  31. // rate of one token every fillInterval, up to the given
  32. // maximum capacity. Both arguments must be
  33. // positive. The bucket is initially full.
  34. func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
  35. return NewBucketWithQuantum(fillInterval, capacity, 1)
  36. }
  37. // rateMargin specifes the allowed variance of actual
  38. // rate from specified rate. 1% seems reasonable.
  39. const rateMargin = 0.01
  40. // NewBucketWithRate returns a token bucket that fills the bucket
  41. // at the rate of rate tokens per second up to the given
  42. // maximum capacity. Because of limited clock resolution,
  43. // at high rates, the actual rate may be up to 1% different from the
  44. // specified rate.
  45. func NewBucketWithRate(rate float64, capacity int64) *Bucket {
  46. for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
  47. fillInterval := time.Duration(1e9 * float64(quantum) / rate)
  48. if fillInterval <= 0 {
  49. continue
  50. }
  51. tb := NewBucketWithQuantum(fillInterval, capacity, quantum)
  52. if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
  53. return tb
  54. }
  55. }
  56. panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64))
  57. }
  58. // nextQuantum returns the next quantum to try after q.
  59. // We grow the quantum exponentially, but slowly, so we
  60. // get a good fit in the lower numbers.
  61. func nextQuantum(q int64) int64 {
  62. q1 := q * 11 / 10
  63. if q1 == q {
  64. q1++
  65. }
  66. return q1
  67. }
  68. // NewBucketWithQuantum is similar to NewBucket, but allows
  69. // the specification of the quantum size - quantum tokens
  70. // are added every fillInterval.
  71. func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket {
  72. if fillInterval <= 0 {
  73. panic("token bucket fill interval is not > 0")
  74. }
  75. if capacity <= 0 {
  76. panic("token bucket capacity is not > 0")
  77. }
  78. if quantum <= 0 {
  79. panic("token bucket quantum is not > 0")
  80. }
  81. return &Bucket{
  82. startTime: time.Now(),
  83. capacity: capacity,
  84. quantum: quantum,
  85. avail: capacity,
  86. fillInterval: fillInterval,
  87. }
  88. }
  89. // Wait takes count tokens from the bucket, waiting until they are
  90. // available.
  91. func (tb *Bucket) Wait(count int64) {
  92. if d := tb.Take(count); d > 0 {
  93. time.Sleep(d)
  94. }
  95. }
  96. // WaitMaxDuration is like Wait except that it will
  97. // only take tokens from the bucket if it needs to wait
  98. // for no greater than maxWait. It reports whether
  99. // any tokens have been removed from the bucket
  100. // If no tokens have been removed, it returns immediately.
  101. func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {
  102. d, ok := tb.TakeMaxDuration(count, maxWait)
  103. if d > 0 {
  104. time.Sleep(d)
  105. }
  106. return ok
  107. }
  108. const infinityDuration time.Duration = 0x7fffffffffffffff
  109. // Take takes count tokens from the bucket without blocking. It returns
  110. // the time that the caller should wait until the tokens are actually
  111. // available.
  112. //
  113. // Note that if the request is irrevocable - there is no way to return
  114. // tokens to the bucket once this method commits us to taking them.
  115. func (tb *Bucket) Take(count int64) time.Duration {
  116. d, _ := tb.take(time.Now(), count, infinityDuration)
  117. return d
  118. }
  119. // TakeMaxDuration is like Take, except that
  120. // it will only take tokens from the bucket if the wait
  121. // time for the tokens is no greater than maxWait.
  122. //
  123. // If it would take longer than maxWait for the tokens
  124. // to become available, it does nothing and reports false,
  125. // otherwise it returns the time that the caller should
  126. // wait until the tokens are actually available, and reports
  127. // true.
  128. func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
  129. return tb.take(time.Now(), count, maxWait)
  130. }
  131. // TakeAvailable takes up to count immediately available tokens from the
  132. // bucket. It returns the number of tokens removed, or zero if there are
  133. // no available tokens. It does not block.
  134. func (tb *Bucket) TakeAvailable(count int64) int64 {
  135. return tb.takeAvailable(time.Now(), count)
  136. }
  137. // takeAvailable is the internal version of TakeAvailable - it takes the
  138. // current time as an argument to enable easy testing.
  139. func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
  140. if count <= 0 {
  141. return 0
  142. }
  143. tb.mu.Lock()
  144. defer tb.mu.Unlock()
  145. tb.adjust(now)
  146. if tb.avail <= 0 {
  147. return 0
  148. }
  149. if count > tb.avail {
  150. count = tb.avail
  151. }
  152. tb.avail -= count
  153. return count
  154. }
  155. // Available returns the number of available tokens. It will be negative
  156. // when there are consumers waiting for tokens. Note that if this
  157. // returns greater than zero, it does not guarantee that calls that take
  158. // tokens from the buffer will succeed, as the number of available
  159. // tokens could have changed in the meantime. This method is intended
  160. // primarily for metrics reporting and debugging.
  161. func (tb *Bucket) Available() int64 {
  162. return tb.available(time.Now())
  163. }
  164. // available is the internal version of available - it takes the current time as
  165. // an argument to enable easy testing.
  166. func (tb *Bucket) available(now time.Time) int64 {
  167. tb.mu.Lock()
  168. defer tb.mu.Unlock()
  169. tb.adjust(now)
  170. return tb.avail
  171. }
  172. // Capacity returns the capacity that the bucket was created with.
  173. func (tb *Bucket) Capacity() int64 {
  174. return tb.capacity
  175. }
  176. // Rate returns the fill rate of the bucket, in tokens per second.
  177. func (tb *Bucket) Rate() float64 {
  178. return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
  179. }
  180. // take is the internal version of Take - it takes the current time as
  181. // an argument to enable easy testing.
  182. func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
  183. if count <= 0 {
  184. return 0, true
  185. }
  186. tb.mu.Lock()
  187. defer tb.mu.Unlock()
  188. currentTick := tb.adjust(now)
  189. avail := tb.avail - count
  190. if avail >= 0 {
  191. tb.avail = avail
  192. return 0, true
  193. }
  194. // Round up the missing tokens to the nearest multiple
  195. // of quantum - the tokens won't be available until
  196. // that tick.
  197. endTick := currentTick + (-avail+tb.quantum-1)/tb.quantum
  198. endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
  199. waitTime := endTime.Sub(now)
  200. if waitTime > maxWait {
  201. return 0, false
  202. }
  203. tb.avail = avail
  204. return waitTime, true
  205. }
  206. // adjust adjusts the current bucket capacity based on the current time.
  207. // It returns the current tick.
  208. func (tb *Bucket) adjust(now time.Time) (currentTick int64) {
  209. currentTick = int64(now.Sub(tb.startTime) / tb.fillInterval)
  210. if tb.avail >= tb.capacity {
  211. return
  212. }
  213. tb.avail += (currentTick - tb.availTick) * tb.quantum
  214. if tb.avail > tb.capacity {
  215. tb.avail = tb.capacity
  216. }
  217. tb.availTick = currentTick
  218. return
  219. }