limiter.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. // Copyright (C) 2017 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "fmt"
  9. "io"
  10. "sync/atomic"
  11. "github.com/syncthing/syncthing/lib/config"
  12. "golang.org/x/net/context"
  13. "golang.org/x/time/rate"
  14. )
  15. // limiter manages a read and write rate limit, reacting to config changes
  16. // as appropriate.
  17. type limiter struct {
  18. write *rate.Limiter
  19. read *rate.Limiter
  20. limitsLAN atomicBool
  21. }
  22. const limiterBurstSize = 4 * 128 << 10
  23. func newLimiter(cfg *config.Wrapper) *limiter {
  24. l := &limiter{
  25. write: rate.NewLimiter(rate.Inf, limiterBurstSize),
  26. read: rate.NewLimiter(rate.Inf, limiterBurstSize),
  27. }
  28. cfg.Subscribe(l)
  29. prev := config.Configuration{Options: config.OptionsConfiguration{MaxRecvKbps: -1, MaxSendKbps: -1}}
  30. l.CommitConfiguration(prev, cfg.RawCopy())
  31. return l
  32. }
  33. func (lim *limiter) newReadLimiter(r io.Reader, isLAN bool) io.Reader {
  34. return &limitedReader{reader: r, limiter: lim, isLAN: isLAN}
  35. }
  36. func (lim *limiter) newWriteLimiter(w io.Writer, isLAN bool) io.Writer {
  37. return &limitedWriter{writer: w, limiter: lim, isLAN: isLAN}
  38. }
  39. func (lim *limiter) VerifyConfiguration(from, to config.Configuration) error {
  40. return nil
  41. }
  42. func (lim *limiter) CommitConfiguration(from, to config.Configuration) bool {
  43. if from.Options.MaxRecvKbps == to.Options.MaxRecvKbps &&
  44. from.Options.MaxSendKbps == to.Options.MaxSendKbps &&
  45. from.Options.LimitBandwidthInLan == to.Options.LimitBandwidthInLan {
  46. return true
  47. }
  48. // The rate variables are in KiB/s in the config (despite the camel casing
  49. // of the name). We multiply by 1024 to get bytes/s.
  50. if to.Options.MaxRecvKbps <= 0 {
  51. lim.read.SetLimit(rate.Inf)
  52. } else {
  53. lim.read.SetLimit(1024 * rate.Limit(to.Options.MaxRecvKbps))
  54. }
  55. if to.Options.MaxSendKbps <= 0 {
  56. lim.write.SetLimit(rate.Inf)
  57. } else {
  58. lim.write.SetLimit(1024 * rate.Limit(to.Options.MaxSendKbps))
  59. }
  60. lim.limitsLAN.set(to.Options.LimitBandwidthInLan)
  61. sendLimitStr := "is unlimited"
  62. recvLimitStr := "is unlimited"
  63. if to.Options.MaxSendKbps > 0 {
  64. sendLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxSendKbps)
  65. }
  66. if to.Options.MaxRecvKbps > 0 {
  67. recvLimitStr = fmt.Sprintf("limit is %d KiB/s", to.Options.MaxRecvKbps)
  68. }
  69. l.Infof("Send rate %s, receive rate %s", sendLimitStr, recvLimitStr)
  70. if to.Options.LimitBandwidthInLan {
  71. l.Infoln("Rate limits apply to LAN connections")
  72. } else {
  73. l.Infoln("Rate limits do not apply to LAN connections")
  74. }
  75. return true
  76. }
  77. func (lim *limiter) String() string {
  78. // required by config.Committer interface
  79. return "connections.limiter"
  80. }
  81. // limitedReader is a rate limited io.Reader
  82. type limitedReader struct {
  83. reader io.Reader
  84. limiter *limiter
  85. isLAN bool
  86. }
  87. func (r *limitedReader) Read(buf []byte) (int, error) {
  88. n, err := r.reader.Read(buf)
  89. if !r.isLAN || r.limiter.limitsLAN.get() {
  90. take(r.limiter.read, n)
  91. }
  92. return n, err
  93. }
  94. // limitedWriter is a rate limited io.Writer
  95. type limitedWriter struct {
  96. writer io.Writer
  97. limiter *limiter
  98. isLAN bool
  99. }
  100. func (w *limitedWriter) Write(buf []byte) (int, error) {
  101. if !w.isLAN || w.limiter.limitsLAN.get() {
  102. take(w.limiter.write, len(buf))
  103. }
  104. return w.writer.Write(buf)
  105. }
  106. // take is a utility function to consume tokens from a rate.Limiter. No call
  107. // to WaitN can be larger than the limiter burst size so we split it up into
  108. // several calls when necessary.
  109. func take(l *rate.Limiter, tokens int) {
  110. if tokens < limiterBurstSize {
  111. // This is the by far more common case so we get it out of the way
  112. // early.
  113. l.WaitN(context.TODO(), tokens)
  114. return
  115. }
  116. for tokens > 0 {
  117. // Consume limiterBurstSize tokens at a time until we're done.
  118. if tokens > limiterBurstSize {
  119. l.WaitN(context.TODO(), limiterBurstSize)
  120. tokens -= limiterBurstSize
  121. } else {
  122. l.WaitN(context.TODO(), tokens)
  123. tokens = 0
  124. }
  125. }
  126. }
  127. type atomicBool int32
  128. func (b *atomicBool) set(v bool) {
  129. if v {
  130. atomic.StoreInt32((*int32)(b), 1)
  131. } else {
  132. atomic.StoreInt32((*int32)(b), 0)
  133. }
  134. }
  135. func (b *atomicBool) get() bool {
  136. return atomic.LoadInt32((*int32)(b)) != 0
  137. }