| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- // Copyright (C) 2017 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package connections
- import (
- "fmt"
- "io"
- "sync/atomic"
- "github.com/syncthing/syncthing/lib/config"
- "golang.org/x/net/context"
- "golang.org/x/time/rate"
- )
- // limiter manages a read and write rate limit, reacting to config changes
- // as appropriate.
- type limiter struct {
- write *rate.Limiter
- read *rate.Limiter
- limitsLAN atomicBool
- }
- const limiterBurstSize = 4 * 128 << 10
- func newLimiter(cfg *config.Wrapper) *limiter {
- l := &limiter{
- write: rate.NewLimiter(rate.Inf, limiterBurstSize),
- read: rate.NewLimiter(rate.Inf, limiterBurstSize),
- }
- cfg.Subscribe(l)
- prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}}
- l.CommitConfiguration(prev, cfg.RawCopy())
- return l
- }
- func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader {
- return &limitedReader{reader: r, limiter: lim, isLAN: isLAN}
- }
- func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer {
- return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN}
- }
- func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error {
- return nil
- }
- func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
- if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
- from.Options.MaxSendKbps == to.Options.MaxSendKbps &&
- from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan {
- return true
- }
- // The rate variables are in KiB/s in the config (despite the camel casing
- // of the name). We multiply by 1024 to get bytes/s.
- if to.Options.MaxRecvKbps <= 0 {
- lim.read.SetLimit(rate.Inf)
- } else {
- lim.read.SetLimit(1024 * rate.Limit(to.Options.MaxRecvKbps))
- }
- if to.Options.MaxSendKbps <= 0 {
- lim.write.SetLimit(rate.Inf)
- } else {
- lim.write.SetLimit(1024 * rate.Limit(to.Options.MaxSendKbps))
- }
- lim.limitsLAN.set(to.Options.LimitBandwidthInLan)
- sendLimitStr := "is unlimited"
- recvLimitStr := "is unlimited"
- if to.Options.MaxSendKbps > 0 {
- sendLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxSendKbps)
- }
- if to.Options.MaxRecvKbps > 0 {
- recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps)
- }
- l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr)
- if to.Options.LimitBandwidthInLan {
- l.Infoln("Rate limits apply to LAN connections")
- } else {
- l.Infoln("Rate limits do not apply to LAN connections")
- }
- return true
- }
- func (lim *limiter) String() string {
- // required by config.Committer interface
- return "connections.limiter"
- }
- // limitedReader is a rate limited io.Reader
- type limitedReader struct {
- reader io.Reader
- limiter *limiter
- isLAN bool
- }
- func (r *limitedReader) Read(buf []byte) (int, error) {
- n, err := r.reader.Read(buf)
- if !r.isLAN || r.limiter.limitsLAN.get() {
- take(r.limiter.read, n)
- }
- return n, err
- }
- // limitedWriter is a rate limited io.Writer
- type limitedWriter struct {
- writer io.Writer
- limiter *limiter
- isLAN bool
- }
- func (w *limitedWriter) Write(buf []byte) (int, error) {
- if !w.isLAN || w.limiter.limitsLAN.get() {
- take(w.limiter.write, len(buf))
- }
- return w.writer.Write(buf)
- }
- // take is a utility function to consume tokens from a rate.Limiter. No call
- // to WaitN can be larger than the limiter burst size so we split it up into
- // several calls when necessary.
- func take(l *rate.Limiter, tokens int) {
- if tokens < limiterBurstSize {
- // This is the by far more common case so we get it out of the way
- // early.
- l.WaitN(context.TODO(), tokens)
- return
- }
- for tokens > 0 {
- // Consume limiterBurstSize tokens at a time until we're done.
- if tokens > limiterBurstSize {
- l.WaitN(context.TODO(), limiterBurstSize)
- tokens -= limiterBurstSize
- } else {
- l.WaitN(context.TODO(), tokens)
- tokens = 0
- }
- }
- }
- type atomicBool int32
- func (b *atomicBool) set(v bool) {
- if v {
- atomic.StoreInt32((*int32)(b), 1)
- } else {
- atomic.StoreInt32((*int32)(b), 0)
- }
- }
- func (b *atomicBool) get() bool {
- return atomic.LoadInt32((*int32)(b)) != 0
- }
|