limiter_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. // Copyright (C) 2017 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "bytes"
  9. "context"
  10. crand "crypto/rand"
  11. "io"
  12. "math/rand"
  13. "testing"
  14. "github.com/syncthing/syncthing/lib/config"
  15. "github.com/syncthing/syncthing/lib/events"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "golang.org/x/time/rate"
  18. )
  19. var device1, device2, device3, device4 protocol.DeviceID
  20. var dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
  21. func init() {
  22. device1, _ = protocol.DeviceIDFromString("AIR6LPZ7K4PTTUXQSMUUCPQ5YWOEDFIIQJUG7772YQXXR5YD6AWQ")
  23. device2, _ = protocol.DeviceIDFromString("GYRZZQB-IRNPV4Z-T7TC52W-EQYJ3TT-FDQW6MW-DFLMU42-SSSU6EM-FBK2VAY")
  24. device3, _ = protocol.DeviceIDFromString("LGFPDIT-7SKNNJL-VJZA4FC-7QNCRKA-CE753K7-2BW5QDK-2FOZ7FR-FEP57QJ")
  25. device4, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2")
  26. }
  27. func newDeviceConfiguration(w config.Wrapper, id protocol.DeviceID, name string) config.DeviceConfiguration {
  28. cfg := w.DefaultDevice()
  29. cfg.DeviceID = id
  30. cfg.Name = name
  31. return cfg
  32. }
  33. func initConfig() (config.Wrapper, context.CancelFunc) {
  34. wrapper := config.Wrap("/dev/null", config.New(device1), device1, events.NoopLogger)
  35. dev1Conf = newDeviceConfiguration(wrapper, device1, "device1")
  36. dev2Conf = newDeviceConfiguration(wrapper, device2, "device2")
  37. dev3Conf = newDeviceConfiguration(wrapper, device3, "device3")
  38. dev4Conf = newDeviceConfiguration(wrapper, device4, "device4")
  39. ctx, cancel := context.WithCancel(context.Background())
  40. go wrapper.Serve(ctx)
  41. dev2Conf.MaxRecvKbps = rand.Int() % 100000
  42. dev2Conf.MaxSendKbps = rand.Int() % 100000
  43. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  44. cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
  45. })
  46. waiter.Wait()
  47. return wrapper, cancel
  48. }
  49. func TestLimiterInit(t *testing.T) {
  50. wrapper, wrapperCancel := initConfig()
  51. defer wrapperCancel()
  52. lim := newLimiter(device1, wrapper)
  53. device2ReadLimit := dev2Conf.MaxRecvKbps
  54. device2WriteLimit := dev2Conf.MaxSendKbps
  55. expectedR := map[protocol.DeviceID]*rate.Limiter{
  56. device2: rate.NewLimiter(rate.Limit(device2ReadLimit*1024), limiterBurstSize),
  57. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  58. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  59. }
  60. expectedW := map[protocol.DeviceID]*rate.Limiter{
  61. device2: rate.NewLimiter(rate.Limit(device2WriteLimit*1024), limiterBurstSize),
  62. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  63. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  64. }
  65. actualR := lim.deviceReadLimiters
  66. actualW := lim.deviceWriteLimiters
  67. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  68. }
  69. func TestSetDeviceLimits(t *testing.T) {
  70. wrapper, wrapperCancel := initConfig()
  71. defer wrapperCancel()
  72. lim := newLimiter(device1, wrapper)
  73. // should still be inf/inf because this is local device
  74. dev1ReadLimit := rand.Int() % 100000
  75. dev1WriteLimit := rand.Int() % 100000
  76. dev1Conf.MaxRecvKbps = dev1ReadLimit
  77. dev1Conf.MaxSendKbps = dev1WriteLimit
  78. dev2ReadLimit := rand.Int() % 100000
  79. dev2WriteLimit := rand.Int() % 100000
  80. dev2Conf.MaxRecvKbps = dev2ReadLimit
  81. dev2Conf.MaxSendKbps = dev2WriteLimit
  82. dev3ReadLimit := rand.Int() % 10000
  83. dev3Conf.MaxRecvKbps = dev3ReadLimit
  84. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  85. cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
  86. })
  87. waiter.Wait()
  88. expectedR := map[protocol.DeviceID]*rate.Limiter{
  89. device2: rate.NewLimiter(rate.Limit(dev2ReadLimit*1024), limiterBurstSize),
  90. device3: rate.NewLimiter(rate.Limit(dev3ReadLimit*1024), limiterBurstSize),
  91. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  92. }
  93. expectedW := map[protocol.DeviceID]*rate.Limiter{
  94. device2: rate.NewLimiter(rate.Limit(dev2WriteLimit*1024), limiterBurstSize),
  95. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  96. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  97. }
  98. actualR := lim.deviceReadLimiters
  99. actualW := lim.deviceWriteLimiters
  100. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  101. }
  102. func TestRemoveDevice(t *testing.T) {
  103. wrapper, wrapperCancel := initConfig()
  104. defer wrapperCancel()
  105. lim := newLimiter(device1, wrapper)
  106. waiter, _ := wrapper.RemoveDevice(device3)
  107. waiter.Wait()
  108. expectedR := map[protocol.DeviceID]*rate.Limiter{
  109. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  110. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  111. }
  112. expectedW := map[protocol.DeviceID]*rate.Limiter{
  113. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  114. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  115. }
  116. actualR := lim.deviceReadLimiters
  117. actualW := lim.deviceWriteLimiters
  118. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  119. }
  120. func TestAddDevice(t *testing.T) {
  121. wrapper, wrapperCancel := initConfig()
  122. defer wrapperCancel()
  123. lim := newLimiter(device1, wrapper)
  124. addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
  125. addDevConf := newDeviceConfiguration(wrapper, addedDevice, "addedDevice")
  126. addDevConf.MaxRecvKbps = 120
  127. addDevConf.MaxSendKbps = 240
  128. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  129. cfg.SetDevice(addDevConf)
  130. })
  131. waiter.Wait()
  132. expectedR := map[protocol.DeviceID]*rate.Limiter{
  133. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  134. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  135. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  136. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
  137. }
  138. expectedW := map[protocol.DeviceID]*rate.Limiter{
  139. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  140. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  141. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  142. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
  143. }
  144. actualR := lim.deviceReadLimiters
  145. actualW := lim.deviceWriteLimiters
  146. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  147. }
  148. func TestAddAndRemove(t *testing.T) {
  149. wrapper, wrapperCancel := initConfig()
  150. defer wrapperCancel()
  151. lim := newLimiter(device1, wrapper)
  152. addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
  153. addDevConf := newDeviceConfiguration(wrapper, addedDevice, "addedDevice")
  154. addDevConf.MaxRecvKbps = 120
  155. addDevConf.MaxSendKbps = 240
  156. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  157. cfg.SetDevice(addDevConf)
  158. })
  159. waiter.Wait()
  160. waiter, _ = wrapper.RemoveDevice(device3)
  161. waiter.Wait()
  162. expectedR := map[protocol.DeviceID]*rate.Limiter{
  163. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  164. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  165. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
  166. }
  167. expectedW := map[protocol.DeviceID]*rate.Limiter{
  168. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  169. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  170. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
  171. }
  172. actualR := lim.deviceReadLimiters
  173. actualW := lim.deviceWriteLimiters
  174. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  175. }
  176. func TestLimitedWriterWrite(t *testing.T) {
  177. // Check that the limited writer writes the correct data in the correct manner.
  178. // A buffer with random data that is larger than the write size and not
  179. // a precise multiple either.
  180. src := make([]byte, int(12.5*maxSingleWriteSize))
  181. if _, err := crand.Reader.Read(src); err != nil {
  182. t.Fatal(err)
  183. }
  184. // Write it to the destination using a limited writer, with a wrapper to
  185. // count the write calls. The defaults on the limited writer should mean
  186. // it is used (and doesn't take the fast path). In practice the limiter
  187. // won't delay the test as the burst size is large enough to accommodate
  188. // regardless of the rate.
  189. dst := new(bytes.Buffer)
  190. cw := &countingWriter{w: dst}
  191. lw := &limitedWriter{
  192. writer: cw,
  193. waiterHolder: waiterHolder{
  194. waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  195. limitsLAN: new(atomicBool),
  196. isLAN: false, // enables limiting
  197. },
  198. }
  199. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  200. t.Fatal(err)
  201. }
  202. // Verify there were lots of writes and that the end result is identical.
  203. if cw.writeCount != 13 {
  204. t.Error("expected lots of smaller writes, but not too many")
  205. }
  206. if !bytes.Equal(src, dst.Bytes()) {
  207. t.Error("results should be equal")
  208. }
  209. // Write it to the destination using a limited writer, with a wrapper to
  210. // count the write calls. Now we make sure the fast path is used.
  211. dst = new(bytes.Buffer)
  212. cw = &countingWriter{w: dst}
  213. lw = &limitedWriter{
  214. writer: cw,
  215. waiterHolder: waiterHolder{
  216. waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  217. limitsLAN: new(atomicBool),
  218. isLAN: true, // disables limiting
  219. },
  220. }
  221. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  222. t.Fatal(err)
  223. }
  224. // Verify there were a single write and that the end result is identical.
  225. if cw.writeCount != 1 {
  226. t.Error("expected just the one write")
  227. }
  228. if !bytes.Equal(src, dst.Bytes()) {
  229. t.Error("results should be equal")
  230. }
  231. // Once more, but making sure the fast path is used for an unlimited
  232. // rate, with multiple unlimited raters even (global and per-device).
  233. dst = new(bytes.Buffer)
  234. cw = &countingWriter{w: dst}
  235. lw = &limitedWriter{
  236. writer: cw,
  237. waiterHolder: waiterHolder{
  238. waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
  239. limitsLAN: new(atomicBool),
  240. isLAN: false, // enables limiting
  241. },
  242. }
  243. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  244. t.Fatal(err)
  245. }
  246. // Verify there were a single write and that the end result is identical.
  247. if cw.writeCount != 1 {
  248. t.Error("expected just the one write")
  249. }
  250. if !bytes.Equal(src, dst.Bytes()) {
  251. t.Error("results should be equal")
  252. }
  253. // Once more, but making sure we *don't* take the fast path when there
  254. // is a combo of limited and unlimited writers.
  255. dst = new(bytes.Buffer)
  256. cw = &countingWriter{w: dst}
  257. lw = &limitedWriter{
  258. writer: cw,
  259. waiterHolder: waiterHolder{
  260. waiter: totalWaiter{
  261. rate.NewLimiter(rate.Inf, limiterBurstSize),
  262. rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  263. rate.NewLimiter(rate.Inf, limiterBurstSize),
  264. },
  265. limitsLAN: new(atomicBool),
  266. isLAN: false, // enables limiting
  267. },
  268. }
  269. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  270. t.Fatal(err)
  271. }
  272. // Verify there were lots of writes and that the end result is identical.
  273. if cw.writeCount != 13 {
  274. t.Error("expected just the one write")
  275. }
  276. if !bytes.Equal(src, dst.Bytes()) {
  277. t.Error("results should be equal")
  278. }
  279. }
  280. func TestTotalWaiterLimit(t *testing.T) {
  281. cases := []struct {
  282. w waiter
  283. r rate.Limit
  284. }{
  285. {
  286. totalWaiter{},
  287. rate.Inf,
  288. },
  289. {
  290. totalWaiter{rate.NewLimiter(rate.Inf, 42)},
  291. rate.Inf,
  292. },
  293. {
  294. totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Inf, 42)},
  295. rate.Inf,
  296. },
  297. {
  298. totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Limit(12), 42), rate.NewLimiter(rate.Limit(15), 42)},
  299. rate.Limit(12),
  300. },
  301. }
  302. for _, tc := range cases {
  303. l := tc.w.Limit()
  304. if l != tc.r {
  305. t.Error("incorrect limit returned")
  306. }
  307. }
  308. }
  309. func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW map[protocol.DeviceID]*rate.Limiter) {
  310. t.Helper()
  311. if len(expectedW) != len(actualW) || len(expectedR) != len(actualR) {
  312. t.Errorf("Map lengths differ!")
  313. }
  314. for key, val := range expectedR {
  315. if _, ok := actualR[key]; !ok {
  316. t.Errorf("Device %s not found in limiter", key)
  317. }
  318. if val.Limit() != actualR[key].Limit() {
  319. t.Errorf("Read limits for device %s differ actual: %f, expected: %f", key, actualR[key].Limit(), val.Limit())
  320. }
  321. if expectedW[key].Limit() != actualW[key].Limit() {
  322. t.Errorf("Write limits for device %s differ actual: %f, expected: %f", key, actualW[key].Limit(), expectedW[key].Limit())
  323. }
  324. }
  325. }
  326. type countingWriter struct {
  327. w io.Writer
  328. writeCount int
  329. }
  330. func (w *countingWriter) Write(data []byte) (int, error) {
  331. w.writeCount++
  332. return w.w.Write(data)
  333. }