Browse Source

Streamline rate limiting

Jakob Borg 11 years ago
parent
commit
76e0960a51

+ 4 - 0
Godeps/Godeps.json

@@ -27,6 +27,10 @@
 			"ImportPath": "github.com/codegangsta/martini",
 			"Comment": "v0.1-142-g8659df7",
 			"Rev": "8659df7a51aebe6c6120268cd5a8b4c34fa8441a"
+		},
+		{
+			"ImportPath": "github.com/juju/ratelimit",
+			"Rev": "cbaa435c80a9716e086f25d409344b26c4039358"
 		}
 	]
 }

+ 185 - 0
Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE

@@ -0,0 +1,185 @@
+This software is licensed under the LGPLv3, included below.
+
+As a special exception to the GNU Lesser General Public License version 3
+("LGPL3"), the copyright holders of this Library give you permission to
+convey to a third party a Combined Work that links statically or dynamically
+to this Library without providing any Minimal Corresponding Source or
+Minimal Application Code as set out in 4d or providing the installation
+information set out in section 4e, provided that you comply with the other
+provisions of LGPL3 and provided that you meet, for the Application the
+terms and conditions of the license(s) which apply to the Application.
+
+Except as stated in this special exception, the provisions of LGPL3 will
+continue to comply in full to this Library. If you modify this Library, you
+may apply this exception to your version of this Library, but you are not
+obliged to do so. If you do not wish to do so, delete this exception
+statement from your version. This exception does not (and cannot) modify any
+license terms which apply to the Application, with which you must still
+comply.
+
+
+                   GNU LESSER GENERAL PUBLIC LICENSE
+                       Version 3, 29 June 2007
+
+ Copyright (C) 2007 Free Software Foundation, Inc. <http://fsf.org/>
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+
+  This version of the GNU Lesser General Public License incorporates
+the terms and conditions of version 3 of the GNU General Public
+License, supplemented by the additional permissions listed below.
+
+  0. Additional Definitions.
+
+  As used herein, "this License" refers to version 3 of the GNU Lesser
+General Public License, and the "GNU GPL" refers to version 3 of the GNU
+General Public License.
+
+  "The Library" refers to a covered work governed by this License,
+other than an Application or a Combined Work as defined below.
+
+  An "Application" is any work that makes use of an interface provided
+by the Library, but which is not otherwise based on the Library.
+Defining a subclass of a class defined by the Library is deemed a mode
+of using an interface provided by the Library.
+
+  A "Combined Work" is a work produced by combining or linking an
+Application with the Library.  The particular version of the Library
+with which the Combined Work was made is also called the "Linked
+Version".
+
+  The "Minimal Corresponding Source" for a Combined Work means the
+Corresponding Source for the Combined Work, excluding any source code
+for portions of the Combined Work that, considered in isolation, are
+based on the Application, and not on the Linked Version.
+
+  The "Corresponding Application Code" for a Combined Work means the
+object code and/or source code for the Application, including any data
+and utility programs needed for reproducing the Combined Work from the
+Application, but excluding the System Libraries of the Combined Work.
+
+  1. Exception to Section 3 of the GNU GPL.
+
+  You may convey a covered work under sections 3 and 4 of this License
+without being bound by section 3 of the GNU GPL.
+
+  2. Conveying Modified Versions.
+
+  If you modify a copy of the Library, and, in your modifications, a
+facility refers to a function or data to be supplied by an Application
+that uses the facility (other than as an argument passed when the
+facility is invoked), then you may convey a copy of the modified
+version:
+
+   a) under this License, provided that you make a good faith effort to
+   ensure that, in the event an Application does not supply the
+   function or data, the facility still operates, and performs
+   whatever part of its purpose remains meaningful, or
+
+   b) under the GNU GPL, with none of the additional permissions of
+   this License applicable to that copy.
+
+  3. Object Code Incorporating Material from Library Header Files.
+
+  The object code form of an Application may incorporate material from
+a header file that is part of the Library.  You may convey such object
+code under terms of your choice, provided that, if the incorporated
+material is not limited to numerical parameters, data structure
+layouts and accessors, or small macros, inline functions and templates
+(ten or fewer lines in length), you do both of the following:
+
+   a) Give prominent notice with each copy of the object code that the
+   Library is used in it and that the Library and its use are
+   covered by this License.
+
+   b) Accompany the object code with a copy of the GNU GPL and this license
+   document.
+
+  4. Combined Works.
+
+  You may convey a Combined Work under terms of your choice that,
+taken together, effectively do not restrict modification of the
+portions of the Library contained in the Combined Work and reverse
+engineering for debugging such modifications, if you also do each of
+the following:
+
+   a) Give prominent notice with each copy of the Combined Work that
+   the Library is used in it and that the Library and its use are
+   covered by this License.
+
+   b) Accompany the Combined Work with a copy of the GNU GPL and this license
+   document.
+
+   c) For a Combined Work that displays copyright notices during
+   execution, include the copyright notice for the Library among
+   these notices, as well as a reference directing the user to the
+   copies of the GNU GPL and this license document.
+
+   d) Do one of the following:
+
+       0) Convey the Minimal Corresponding Source under the terms of this
+       License, and the Corresponding Application Code in a form
+       suitable for, and under terms that permit, the user to
+       recombine or relink the Application with a modified version of
+       the Linked Version to produce a modified Combined Work, in the
+       manner specified by section 6 of the GNU GPL for conveying
+       Corresponding Source.
+
+       1) Use a suitable shared library mechanism for linking with the
+       Library.  A suitable mechanism is one that (a) uses at run time
+       a copy of the Library already present on the user's computer
+       system, and (b) will operate properly with a modified version
+       of the Library that is interface-compatible with the Linked
+       Version.
+
+   e) Provide Installation Information, but only if you would otherwise
+   be required to provide such information under section 6 of the
+   GNU GPL, and only to the extent that such information is
+   necessary to install and execute a modified version of the
+   Combined Work produced by recombining or relinking the
+   Application with a modified version of the Linked Version. (If
+   you use option 4d0, the Installation Information must accompany
+   the Minimal Corresponding Source and Corresponding Application
+   Code. If you use option 4d1, you must provide the Installation
+   Information in the manner specified by section 6 of the GNU GPL
+   for conveying Corresponding Source.)
+
+  5. Combined Libraries.
+
+  You may place library facilities that are a work based on the
+Library side by side in a single library together with other library
+facilities that are not Applications and are not covered by this
+License, and convey such a combined library under terms of your
+choice, if you do both of the following:
+
+   a) Accompany the combined library with a copy of the same work based
+   on the Library, uncombined with any other library facilities,
+   conveyed under the terms of this License.
+
+   b) Give prominent notice with the combined library that part of it
+   is a work based on the Library, and explaining where to find the
+   accompanying uncombined form of the same work.
+
+  6. Revised Versions of the GNU Lesser General Public License.
+
+  The Free Software Foundation may publish revised and/or new versions
+of the GNU Lesser General Public License from time to time. Such new
+versions will be similar in spirit to the present version, but may
+differ in detail to address new problems or concerns.
+
+  Each version is given a distinguishing version number. If the
+Library as you received it specifies that a certain numbered version
+of the GNU Lesser General Public License "or any later version"
+applies to it, you have the option of following the terms and
+conditions either of that published version or of any later version
+published by the Free Software Foundation. If the Library as you
+received it does not specify a version number of the GNU Lesser
+General Public License, you may choose any version of the GNU Lesser
+General Public License ever published by the Free Software Foundation.
+
+  If the Library as you received it specifies that a proxy can decide
+whether future versions of the GNU Lesser General Public License shall
+apply, that proxy's public statement of acceptance of any version is
+permanent authorization for you to choose that version for the
+Library.

+ 109 - 0
Godeps/_workspace/src/github.com/juju/ratelimit/README.md

@@ -0,0 +1,109 @@
+# ratelimit
+--
+    import "github.com/juju/ratelimit"
+
+The ratelimit package provides an efficient token bucket implementation. See
+http://en.wikipedia.org/wiki/Token_bucket.
+
+## Usage
+
+#### func  Reader
+
+```go
+func Reader(r io.Reader, bucket *Bucket) io.Reader
+```
+Reader returns a reader that is rate limited by the given token bucket. Each
+token in the bucket represents one byte.
+
+#### func  Writer
+
+```go
+func Writer(w io.Writer, bucket *Bucket) io.Writer
+```
+Writer returns a reader that is rate limited by the given token bucket. Each
+token in the bucket represents one byte.
+
+#### type Bucket
+
+```go
+type Bucket struct {
+}
+```
+
+Bucket represents a token bucket that fills at a predetermined rate. Methods on
+Bucket may be called concurrently.
+
+#### func  NewBucket
+
+```go
+func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
+```
+NewBucket returns a new token bucket that fills at the rate of one token every
+fillInterval, up to the given maximum capacity. Both arguments must be positive.
+The bucket is initially full.
+
+#### func  NewBucketWithRate
+
+```go
+func NewBucketWithRate(rate float64, capacity int64) *Bucket
+```
+NewBucketWithRate returns a token bucket that fills the bucket at the rate of
+rate tokens per second up to the given maximum capacity. Because of limited
+clock resolution, at high rates, the actual rate may be up to 1% different from
+the specified rate.
+
+#### func (*Bucket) Rate
+
+```go
+func (tb *Bucket) Rate() float64
+```
+Rate returns the fill rate of the bucket, in tokens per second.
+
+#### func (*Bucket) Take
+
+```go
+func (tb *Bucket) Take(count int64) time.Duration
+```
+Take takes count tokens from the bucket without blocking. It returns the time
+that the caller should wait until the tokens are actually available.
+
+Note that if the request is irrevocable - there is no way to return tokens to
+the bucket once this method commits us to taking them.
+
+#### func (*Bucket) TakeAvailable
+
+```go
+func (tb *Bucket) TakeAvailable(count int64) int64
+```
+TakeAvailable takes up to count immediately available tokens from the bucket. It
+returns the number of tokens removed, or zero if there are no available tokens.
+It does not block.
+
+#### func (*Bucket) TakeMaxDuration
+
+```go
+func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool)
+```
+TakeMaxDuration is like Take, except that it will only take tokens from the
+bucket if the wait time for the tokens is no greater than maxWait.
+
+If it would take longer than maxWait for the tokens to become available, it does
+nothing and reports false, otherwise it returns the time that the caller should
+wait until the tokens are actually available, and reports true.
+
+#### func (*Bucket) Wait
+
+```go
+func (tb *Bucket) Wait(count int64)
+```
+Wait takes count tokens from the bucket, waiting until they are available.
+
+#### func (*Bucket) WaitMaxDuration
+
+```go
+func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool
+```
+WaitMaxDuration is like Wait except that it will only take tokens from the
+bucket if it needs to wait for no greater than maxWait. It reports whether any
+tokens have been removed from the bucket If no tokens have been removed, it
+returns immediately.

+ 227 - 0
Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go

@@ -0,0 +1,227 @@
+// Copyright 2014 Canonical Ltd.
+// Licensed under the LGPLv3 with static-linking exception.
+// See LICENCE file for details.
+
+// The ratelimit package provides an efficient token bucket implementation.
+// See http://en.wikipedia.org/wiki/Token_bucket.
+package ratelimit
+
+import (
+	"strconv"
+	"sync"
+	"time"
+)
+
+// Bucket represents a token bucket that fills at a predetermined rate.
+// Methods on Bucket may be called concurrently.
+type Bucket struct {
+	startTime    time.Time
+	capacity     int64
+	quantum      int64
+	fillInterval time.Duration
+
+	// The mutex guards the fields following it.
+	mu sync.Mutex
+
+	// avail holds the number of available tokens
+	// in the bucket, as of availTick ticks from startTime.
+	// It will be negative when there are consumers
+	// waiting for tokens.
+	avail     int64
+	availTick int64
+}
+
+// NewBucket returns a new token bucket that fills at the
+// rate of one token every fillInterval, up to the given
+// maximum capacity. Both arguments must be
+// positive. The bucket is initially full.
+func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
+	return newBucketWithQuantum(fillInterval, capacity, 1)
+}
+
+// rateMargin specifes the allowed variance of actual
+// rate from specified rate. 1% seems reasonable.
+const rateMargin = 0.01
+
+// NewBucketWithRate returns a token bucket that fills the bucket
+// at the rate of rate tokens per second up to the given
+// maximum capacity. Because of limited clock resolution,
+// at high rates, the actual rate may be up to 1% different from the
+// specified rate.
+func NewBucketWithRate(rate float64, capacity int64) *Bucket {
+	for quantum := int64(1); quantum < 1<<50; quantum = nextQuantum(quantum) {
+		fillInterval := time.Duration(1e9 * float64(quantum) / rate)
+		if fillInterval <= 0 {
+			continue
+		}
+		tb := newBucketWithQuantum(fillInterval, capacity, quantum)
+		if diff := abs(tb.Rate() - rate); diff/rate <= rateMargin {
+			return tb
+		}
+	}
+	panic("cannot find suitable quantum for " + strconv.FormatFloat(rate, 'g', -1, 64))
+}
+
+// nextQuantum returns the next quantum to try after q.
+// We grow the quantum exponentially, but slowly, so we
+// get a good fit in the lower numbers.
+func nextQuantum(q int64) int64 {
+	q1 := q * 11 / 10
+	if q1 == q {
+		q1++
+	}
+	return q1
+}
+
+// newBucketWithQuantum is similar to NewBucket, but allows
+// the specification of the quantum size - quantum tokens
+// are added every fillInterval. This is so that we can get accurate
+// rates even when we want to add more than one token per ns.
+func newBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket {
+	if fillInterval <= 0 {
+		panic("token bucket fill interval is not > 0")
+	}
+	if capacity <= 0 {
+		panic("token bucket capacity is not > 0")
+	}
+	if quantum <= 0 {
+		panic("token bucket quantum is not > 0")
+	}
+	return &Bucket{
+		startTime:    time.Now(),
+		capacity:     capacity,
+		quantum:      quantum,
+		avail:        capacity,
+		fillInterval: fillInterval,
+	}
+}
+
+// Wait takes count tokens from the bucket, waiting until they are
+// available.
+func (tb *Bucket) Wait(count int64) {
+	if d := tb.Take(count); d > 0 {
+		time.Sleep(d)
+	}
+}
+
+// WaitMaxDuration is like Wait except that it will
+// only take tokens from the bucket if it needs to wait
+// for no greater than maxWait. It reports whether
+// any tokens have been removed from the bucket
+// If no tokens have been removed, it returns immediately.
+func (tb *Bucket) WaitMaxDuration(count int64, maxWait time.Duration) bool {
+	d, ok := tb.TakeMaxDuration(count, maxWait)
+	if d > 0 {
+		time.Sleep(d)
+	}
+	return ok
+}
+
+const infinityDuration time.Duration = 0x7fffffffffffffff
+
+// Take takes count tokens from the bucket without blocking. It returns
+// the time that the caller should wait until the tokens are actually
+// available.
+//
+// Note that if the request is irrevocable - there is no way to return
+// tokens to the bucket once this method commits us to taking them.
+func (tb *Bucket) Take(count int64) time.Duration {
+	d, _ := tb.take(time.Now(), count, infinityDuration)
+	return d
+}
+
+// TakeMaxDuration is like Take, except that
+// it will only take tokens from the bucket if the wait
+// time for the tokens is no greater than maxWait.
+//
+// If it would take longer than maxWait for the tokens
+// to become available, it does nothing and reports false,
+// otherwise it returns the time that the caller should
+// wait until the tokens are actually available, and reports
+// true.
+func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
+	return tb.take(time.Now(), count, maxWait)
+}
+
+// TakeAvailable takes up to count immediately available tokens from the
+// bucket. It returns the number of tokens removed, or zero if there are
+// no available tokens. It does not block.
+func (tb *Bucket) TakeAvailable(count int64) int64 {
+	return tb.takeAvailable(time.Now(), count)
+}
+
+// takeAvailable is the internal version of TakeAvailable - it takes the
+// current time as an argument to enable easy testing.
+func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
+	if count <= 0 {
+		return 0
+	}
+	tb.mu.Lock()
+	defer tb.mu.Unlock()
+
+	tb.adjust(now)
+	if tb.avail <= 0 {
+		return 0
+	}
+	if count > tb.avail {
+		count = tb.avail
+	}
+	tb.avail -= count
+	return count
+}
+
+// Rate returns the fill rate of the bucket, in tokens per second.
+func (tb *Bucket) Rate() float64 {
+	return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
+}
+
+// take is the internal version of Take - it takes the current time as
+// an argument to enable easy testing.
+func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
+	if count <= 0 {
+		return 0, true
+	}
+	tb.mu.Lock()
+	defer tb.mu.Unlock()
+
+	currentTick := tb.adjust(now)
+	avail := tb.avail - count
+	if avail >= 0 {
+		tb.avail = avail
+		return 0, true
+	}
+	// Round up the missing tokens to the nearest multiple
+	// of quantum - the tokens won't be available until
+	// that tick.
+	endTick := currentTick + (-avail+tb.quantum-1)/tb.quantum
+	endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
+	waitTime := endTime.Sub(now)
+	if waitTime > maxWait {
+		return 0, false
+	}
+	tb.avail = avail
+	return waitTime, true
+}
+
+// adjust adjusts the current bucket capacity based on the current time.
+// It returns the current tick.
+func (tb *Bucket) adjust(now time.Time) (currentTick int64) {
+	currentTick = int64(now.Sub(tb.startTime) / tb.fillInterval)
+
+	if tb.avail >= tb.capacity {
+		return
+	}
+	tb.avail += (currentTick - tb.availTick) * tb.quantum
+	if tb.avail > tb.capacity {
+		tb.avail = tb.capacity
+	}
+	tb.availTick = currentTick
+	return
+}
+
+func abs(f float64) float64 {
+	if f < 0 {
+		return -f
+	}
+	return f
+}

+ 328 - 0
Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit_test.go

@@ -0,0 +1,328 @@
+// Copyright 2014 Canonical Ltd.
+// Licensed under the LGPLv3 with static-linking exception.
+// See LICENCE file for details.
+
+package ratelimit
+
+import (
+	gc "launchpad.net/gocheck"
+
+	"testing"
+	"time"
+)
+
+func TestPackage(t *testing.T) {
+	gc.TestingT(t)
+}
+
+type rateLimitSuite struct{}
+
+var _ = gc.Suite(rateLimitSuite{})
+
+type takeReq struct {
+	time       time.Duration
+	count      int64
+	expectWait time.Duration
+}
+
+var takeTests = []struct {
+	about        string
+	fillInterval time.Duration
+	capacity     int64
+	reqs         []takeReq
+}{{
+	about:        "serial requests",
+	fillInterval: 250 * time.Millisecond,
+	capacity:     10,
+	reqs: []takeReq{{
+		time:       0,
+		count:      0,
+		expectWait: 0,
+	}, {
+		time:       0,
+		count:      10,
+		expectWait: 0,
+	}, {
+		time:       0,
+		count:      1,
+		expectWait: 250 * time.Millisecond,
+	}, {
+		time:       250 * time.Millisecond,
+		count:      1,
+		expectWait: 250 * time.Millisecond,
+	}},
+}, {
+	about:        "concurrent requests",
+	fillInterval: 250 * time.Millisecond,
+	capacity:     10,
+	reqs: []takeReq{{
+		time:       0,
+		count:      10,
+		expectWait: 0,
+	}, {
+		time:       0,
+		count:      2,
+		expectWait: 500 * time.Millisecond,
+	}, {
+		time:       0,
+		count:      2,
+		expectWait: 1000 * time.Millisecond,
+	}, {
+		time:       0,
+		count:      1,
+		expectWait: 1250 * time.Millisecond,
+	}},
+}, {
+	about:        "more than capacity",
+	fillInterval: 1 * time.Millisecond,
+	capacity:     10,
+	reqs: []takeReq{{
+		time:       0,
+		count:      10,
+		expectWait: 0,
+	}, {
+		time:       20 * time.Millisecond,
+		count:      15,
+		expectWait: 5 * time.Millisecond,
+	}},
+}, {
+	about:        "sub-quantum time",
+	fillInterval: 10 * time.Millisecond,
+	capacity:     10,
+	reqs: []takeReq{{
+		time:       0,
+		count:      10,
+		expectWait: 0,
+	}, {
+		time:       7 * time.Millisecond,
+		count:      1,
+		expectWait: 3 * time.Millisecond,
+	}, {
+		time:       8 * time.Millisecond,
+		count:      1,
+		expectWait: 12 * time.Millisecond,
+	}},
+}, {
+	about:        "within capacity",
+	fillInterval: 10 * time.Millisecond,
+	capacity:     5,
+	reqs: []takeReq{{
+		time:       0,
+		count:      5,
+		expectWait: 0,
+	}, {
+		time:       60 * time.Millisecond,
+		count:      5,
+		expectWait: 0,
+	}, {
+		time:       60 * time.Millisecond,
+		count:      1,
+		expectWait: 10 * time.Millisecond,
+	}, {
+		time:       80 * time.Millisecond,
+		count:      2,
+		expectWait: 10 * time.Millisecond,
+	}},
+}}
+
+func (rateLimitSuite) TestTake(c *gc.C) {
+	for i, test := range takeTests {
+		tb := NewBucket(test.fillInterval, test.capacity)
+		for j, req := range test.reqs {
+			d, ok := tb.take(tb.startTime.Add(req.time), req.count, infinityDuration)
+			c.Assert(ok, gc.Equals, true)
+			if d != req.expectWait {
+				c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait)
+			}
+		}
+	}
+}
+
+func (rateLimitSuite) TestTakeMaxDuration(c *gc.C) {
+	for i, test := range takeTests {
+		tb := NewBucket(test.fillInterval, test.capacity)
+		for j, req := range test.reqs {
+			if req.expectWait > 0 {
+				d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait-1)
+				c.Assert(ok, gc.Equals, false)
+				c.Assert(d, gc.Equals, time.Duration(0))
+			}
+			d, ok := tb.take(tb.startTime.Add(req.time), req.count, req.expectWait)
+			c.Assert(ok, gc.Equals, true)
+			if d != req.expectWait {
+				c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expectWait)
+			}
+		}
+	}
+}
+
+type takeAvailableReq struct {
+	time   time.Duration
+	count  int64
+	expect int64
+}
+
+var takeAvailableTests = []struct {
+	about        string
+	fillInterval time.Duration
+	capacity     int64
+	reqs         []takeAvailableReq
+}{{
+	about:        "serial requests",
+	fillInterval: 250 * time.Millisecond,
+	capacity:     10,
+	reqs: []takeAvailableReq{{
+		time:   0,
+		count:  0,
+		expect: 0,
+	}, {
+		time:   0,
+		count:  10,
+		expect: 10,
+	}, {
+		time:   0,
+		count:  1,
+		expect: 0,
+	}, {
+		time:   250 * time.Millisecond,
+		count:  1,
+		expect: 1,
+	}},
+}, {
+	about:        "concurrent requests",
+	fillInterval: 250 * time.Millisecond,
+	capacity:     10,
+	reqs: []takeAvailableReq{{
+		time:   0,
+		count:  5,
+		expect: 5,
+	}, {
+		time:   0,
+		count:  2,
+		expect: 2,
+	}, {
+		time:   0,
+		count:  5,
+		expect: 3,
+	}, {
+		time:   0,
+		count:  1,
+		expect: 0,
+	}},
+}, {
+	about:        "more than capacity",
+	fillInterval: 1 * time.Millisecond,
+	capacity:     10,
+	reqs: []takeAvailableReq{{
+		time:   0,
+		count:  10,
+		expect: 10,
+	}, {
+		time:   20 * time.Millisecond,
+		count:  15,
+		expect: 10,
+	}},
+}, {
+	about:        "within capacity",
+	fillInterval: 10 * time.Millisecond,
+	capacity:     5,
+	reqs: []takeAvailableReq{{
+		time:   0,
+		count:  5,
+		expect: 5,
+	}, {
+		time:   60 * time.Millisecond,
+		count:  5,
+		expect: 5,
+	}, {
+		time:   70 * time.Millisecond,
+		count:  1,
+		expect: 1,
+	}},
+}}
+
+func (rateLimitSuite) TestTakeAvailable(c *gc.C) {
+	for i, test := range takeAvailableTests {
+		tb := NewBucket(test.fillInterval, test.capacity)
+		for j, req := range test.reqs {
+			d := tb.takeAvailable(tb.startTime.Add(req.time), req.count)
+			if d != req.expect {
+				c.Fatalf("test %d.%d, %s, got %v want %v", i, j, test.about, d, req.expect)
+			}
+		}
+	}
+}
+
+func (rateLimitSuite) TestPanics(c *gc.C) {
+	c.Assert(func() { NewBucket(0, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0")
+	c.Assert(func() { NewBucket(-2, 1) }, gc.PanicMatches, "token bucket fill interval is not > 0")
+	c.Assert(func() { NewBucket(1, 0) }, gc.PanicMatches, "token bucket capacity is not > 0")
+	c.Assert(func() { NewBucket(1, -2) }, gc.PanicMatches, "token bucket capacity is not > 0")
+}
+
+func isCloseTo(x, y, tolerance float64) bool {
+	return abs(x-y)/y < tolerance
+}
+
+func (rateLimitSuite) TestRate(c *gc.C) {
+	tb := NewBucket(1, 1)
+	if !isCloseTo(tb.Rate(), 1e9, 0.00001) {
+		c.Fatalf("got %v want 1e9", tb.Rate())
+	}
+	tb = NewBucket(2*time.Second, 1)
+	if !isCloseTo(tb.Rate(), 0.5, 0.00001) {
+		c.Fatalf("got %v want 0.5", tb.Rate())
+	}
+	tb = newBucketWithQuantum(100*time.Millisecond, 1, 5)
+	if !isCloseTo(tb.Rate(), 50, 0.00001) {
+		c.Fatalf("got %v want 50", tb.Rate())
+	}
+}
+
+func checkRate(c *gc.C, rate float64) {
+	tb := NewBucketWithRate(rate, 1<<62)
+	if !isCloseTo(tb.Rate(), rate, rateMargin) {
+		c.Fatalf("got %g want %v", tb.Rate(), rate)
+	}
+	d, ok := tb.take(tb.startTime, 1<<62, infinityDuration)
+	c.Assert(ok, gc.Equals, true)
+	c.Assert(d, gc.Equals, time.Duration(0))
+
+	// Check that the actual rate is as expected by
+	// asking for a not-quite multiple of the bucket's
+	// quantum and checking that the wait time
+	// correct.
+	d, ok = tb.take(tb.startTime, tb.quantum*2-tb.quantum/2, infinityDuration)
+	c.Assert(ok, gc.Equals, true)
+	expectTime := 1e9 * float64(tb.quantum) * 2 / rate
+	if !isCloseTo(float64(d), expectTime, rateMargin) {
+		c.Fatalf("rate %g: got %g want %v", rate, float64(d), expectTime)
+	}
+}
+
+func (rateLimitSuite) TestNewWithRate(c *gc.C) {
+	for rate := float64(1); rate < 1e6; rate += 7 {
+		checkRate(c, rate)
+	}
+	for _, rate := range []float64{
+		1024 * 1024 * 1024,
+		1e-5,
+		0.9e-5,
+		0.5,
+		0.9,
+		0.9e8,
+		3e12,
+		4e18,
+	} {
+		checkRate(c, rate)
+		checkRate(c, rate/3)
+		checkRate(c, rate*1.3)
+	}
+}
+
+func BenchmarkWait(b *testing.B) {
+	tb := NewBucket(1, 16*1024)
+	for i := b.N - 1; i >= 0; i-- {
+		tb.Wait(1)
+	}
+}

+ 51 - 0
Godeps/_workspace/src/github.com/juju/ratelimit/reader.go

@@ -0,0 +1,51 @@
+// Copyright 2014 Canonical Ltd.
+// Licensed under the LGPLv3 with static-linking exception.
+// See LICENCE file for details.
+
+package ratelimit
+
+import "io"
+
+type reader struct {
+	r      io.Reader
+	bucket *Bucket
+}
+
+// Reader returns a reader that is rate limited by
+// the given token bucket. Each token in the bucket
+// represents one byte.
+func Reader(r io.Reader, bucket *Bucket) io.Reader {
+	return &reader{
+		r:      r,
+		bucket: bucket,
+	}
+}
+
+func (r *reader) Read(buf []byte) (int, error) {
+	n, err := r.r.Read(buf)
+	if n <= 0 {
+		return n, err
+	}
+	r.bucket.Wait(int64(n))
+	return n, err
+}
+
+type writer struct {
+	w      io.Writer
+	bucket *Bucket
+}
+
+// Writer returns a reader that is rate limited by
+// the given token bucket. Each token in the bucket
+// represents one byte.
+func Writer(w io.Writer, bucket *Bucket) io.Writer {
+	return &writer{
+		w:      w,
+		bucket: bucket,
+	}
+}
+
+func (w *writer) Write(buf []byte) (int, error) {
+	w.bucket.Wait(int64(len(buf)))
+	return w.w.Write(buf)
+}

+ 4 - 16
cmd/syncthing/model.go

@@ -18,6 +18,7 @@ import (
 	"github.com/calmh/syncthing/lamport"
 	"github.com/calmh/syncthing/protocol"
 	"github.com/calmh/syncthing/scanner"
+	"github.com/juju/ratelimit"
 )
 
 type Model struct {
@@ -35,7 +36,7 @@ type Model struct {
 
 	sup suppressor
 
-	limitRequestRate chan struct{}
+	limitRequestRate *ratelimit.Bucket
 
 	addedRepo bool
 	started   bool
@@ -66,18 +67,7 @@ func NewModel(maxChangeBw int) *Model {
 }
 
 func (m *Model) LimitRate(kbps int) {
-	m.limitRequestRate = make(chan struct{}, kbps)
-	n := kbps/10 + 1
-	go func() {
-		for {
-			time.Sleep(100 * time.Millisecond)
-			for i := 0; i < n; i++ {
-				select {
-				case m.limitRequestRate <- struct{}{}:
-				}
-			}
-		}
-	}()
+	m.limitRequestRate = ratelimit.NewBucketWithRate(float64(kbps), int64(5*kbps))
 }
 
 // StartRW starts read/write processing on the current model. When in
@@ -362,9 +352,7 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by
 	}
 
 	if m.limitRequestRate != nil {
-		for s := 0; s < len(buf); s += 1024 {
-			<-m.limitRequestRate
-		}
+		m.limitRequestRate.Wait(int64(size / 1024))
 	}
 
 	return buf, nil