limiter_test.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. // Copyright (C) 2017 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package connections
  7. import (
  8. "bytes"
  9. "context"
  10. crand "crypto/rand"
  11. "io"
  12. "math/rand"
  13. "sync/atomic"
  14. "testing"
  15. "golang.org/x/time/rate"
  16. "github.com/syncthing/syncthing/lib/config"
  17. "github.com/syncthing/syncthing/lib/events"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. )
  20. var (
  21. device1, device2, device3, device4 protocol.DeviceID
  22. dev1Conf, dev2Conf, dev3Conf, dev4Conf config.DeviceConfiguration
  23. )
  24. func init() {
  25. device1, _ = protocol.DeviceIDFromString("AIR6LPZ7K4PTTUXQSMUUCPQ5YWOEDFIIQJUG7772YQXXR5YD6AWQ")
  26. device2, _ = protocol.DeviceIDFromString("GYRZZQB-IRNPV4Z-T7TC52W-EQYJ3TT-FDQW6MW-DFLMU42-SSSU6EM-FBK2VAY")
  27. device3, _ = protocol.DeviceIDFromString("LGFPDIT-7SKNNJL-VJZA4FC-7QNCRKA-CE753K7-2BW5QDK-2FOZ7FR-FEP57QJ")
  28. device4, _ = protocol.DeviceIDFromString("P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2")
  29. }
  30. func newDeviceConfiguration(w config.Wrapper, id protocol.DeviceID, name string) config.DeviceConfiguration {
  31. cfg := w.DefaultDevice()
  32. cfg.DeviceID = id
  33. cfg.Name = name
  34. return cfg
  35. }
  36. func initConfig() (config.Wrapper, context.CancelFunc) {
  37. wrapper := config.Wrap("/dev/null", config.New(device1), device1, events.NoopLogger)
  38. dev1Conf = newDeviceConfiguration(wrapper, device1, "device1")
  39. dev2Conf = newDeviceConfiguration(wrapper, device2, "device2")
  40. dev3Conf = newDeviceConfiguration(wrapper, device3, "device3")
  41. dev4Conf = newDeviceConfiguration(wrapper, device4, "device4")
  42. ctx, cancel := context.WithCancel(context.Background())
  43. go wrapper.Serve(ctx)
  44. dev2Conf.MaxRecvKbps = rand.Int() % 100000
  45. dev2Conf.MaxSendKbps = rand.Int() % 100000
  46. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  47. cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
  48. })
  49. waiter.Wait()
  50. return wrapper, cancel
  51. }
  52. func TestLimiterInit(t *testing.T) {
  53. wrapper, wrapperCancel := initConfig()
  54. defer wrapperCancel()
  55. lim := newLimiter(device1, wrapper)
  56. device2ReadLimit := dev2Conf.MaxRecvKbps
  57. device2WriteLimit := dev2Conf.MaxSendKbps
  58. expectedR := map[protocol.DeviceID]*rate.Limiter{
  59. device2: rate.NewLimiter(rate.Limit(device2ReadLimit*1024), limiterBurstSize),
  60. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  61. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  62. }
  63. expectedW := map[protocol.DeviceID]*rate.Limiter{
  64. device2: rate.NewLimiter(rate.Limit(device2WriteLimit*1024), limiterBurstSize),
  65. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  66. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  67. }
  68. actualR := lim.deviceReadLimiters
  69. actualW := lim.deviceWriteLimiters
  70. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  71. }
  72. func TestSetDeviceLimits(t *testing.T) {
  73. wrapper, wrapperCancel := initConfig()
  74. defer wrapperCancel()
  75. lim := newLimiter(device1, wrapper)
  76. // should still be inf/inf because this is local device
  77. dev1ReadLimit := rand.Int() % 100000
  78. dev1WriteLimit := rand.Int() % 100000
  79. dev1Conf.MaxRecvKbps = dev1ReadLimit
  80. dev1Conf.MaxSendKbps = dev1WriteLimit
  81. dev2ReadLimit := rand.Int() % 100000
  82. dev2WriteLimit := rand.Int() % 100000
  83. dev2Conf.MaxRecvKbps = dev2ReadLimit
  84. dev2Conf.MaxSendKbps = dev2WriteLimit
  85. dev3ReadLimit := rand.Int() % 10000
  86. dev3Conf.MaxRecvKbps = dev3ReadLimit
  87. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  88. cfg.SetDevices([]config.DeviceConfiguration{dev1Conf, dev2Conf, dev3Conf, dev4Conf})
  89. })
  90. waiter.Wait()
  91. expectedR := map[protocol.DeviceID]*rate.Limiter{
  92. device2: rate.NewLimiter(rate.Limit(dev2ReadLimit*1024), limiterBurstSize),
  93. device3: rate.NewLimiter(rate.Limit(dev3ReadLimit*1024), limiterBurstSize),
  94. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  95. }
  96. expectedW := map[protocol.DeviceID]*rate.Limiter{
  97. device2: rate.NewLimiter(rate.Limit(dev2WriteLimit*1024), limiterBurstSize),
  98. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  99. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  100. }
  101. actualR := lim.deviceReadLimiters
  102. actualW := lim.deviceWriteLimiters
  103. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  104. }
  105. func TestRemoveDevice(t *testing.T) {
  106. wrapper, wrapperCancel := initConfig()
  107. defer wrapperCancel()
  108. lim := newLimiter(device1, wrapper)
  109. waiter, _ := wrapper.RemoveDevice(device3)
  110. waiter.Wait()
  111. expectedR := map[protocol.DeviceID]*rate.Limiter{
  112. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  113. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  114. }
  115. expectedW := map[protocol.DeviceID]*rate.Limiter{
  116. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  117. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  118. }
  119. actualR := lim.deviceReadLimiters
  120. actualW := lim.deviceWriteLimiters
  121. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  122. }
  123. func TestAddDevice(t *testing.T) {
  124. wrapper, wrapperCancel := initConfig()
  125. defer wrapperCancel()
  126. lim := newLimiter(device1, wrapper)
  127. addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
  128. addDevConf := newDeviceConfiguration(wrapper, addedDevice, "addedDevice")
  129. addDevConf.MaxRecvKbps = 120
  130. addDevConf.MaxSendKbps = 240
  131. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  132. cfg.SetDevice(addDevConf)
  133. })
  134. waiter.Wait()
  135. expectedR := map[protocol.DeviceID]*rate.Limiter{
  136. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  137. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  138. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  139. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
  140. }
  141. expectedW := map[protocol.DeviceID]*rate.Limiter{
  142. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  143. device3: rate.NewLimiter(rate.Inf, limiterBurstSize),
  144. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  145. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
  146. }
  147. actualR := lim.deviceReadLimiters
  148. actualW := lim.deviceWriteLimiters
  149. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  150. }
  151. func TestAddAndRemove(t *testing.T) {
  152. wrapper, wrapperCancel := initConfig()
  153. defer wrapperCancel()
  154. lim := newLimiter(device1, wrapper)
  155. addedDevice, _ := protocol.DeviceIDFromString("XZJ4UNS-ENI7QGJ-J45DT6G-QSGML2K-6I4XVOG-NAZ7BF5-2VAOWNT-TFDOMQU")
  156. addDevConf := newDeviceConfiguration(wrapper, addedDevice, "addedDevice")
  157. addDevConf.MaxRecvKbps = 120
  158. addDevConf.MaxSendKbps = 240
  159. waiter, _ := wrapper.Modify(func(cfg *config.Configuration) {
  160. cfg.SetDevice(addDevConf)
  161. })
  162. waiter.Wait()
  163. waiter, _ = wrapper.RemoveDevice(device3)
  164. waiter.Wait()
  165. expectedR := map[protocol.DeviceID]*rate.Limiter{
  166. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxRecvKbps*1024), limiterBurstSize),
  167. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  168. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxRecvKbps*1024), limiterBurstSize),
  169. }
  170. expectedW := map[protocol.DeviceID]*rate.Limiter{
  171. device2: rate.NewLimiter(rate.Limit(dev2Conf.MaxSendKbps*1024), limiterBurstSize),
  172. device4: rate.NewLimiter(rate.Inf, limiterBurstSize),
  173. addedDevice: rate.NewLimiter(rate.Limit(addDevConf.MaxSendKbps*1024), limiterBurstSize),
  174. }
  175. actualR := lim.deviceReadLimiters
  176. actualW := lim.deviceWriteLimiters
  177. checkActualAndExpected(t, actualR, actualW, expectedR, expectedW)
  178. }
  179. func TestLimitedWriterWrite(t *testing.T) {
  180. // Check that the limited writer writes the correct data in the correct manner.
  181. // A buffer with random data that is larger than the write size and not
  182. // a precise multiple either.
  183. src := make([]byte, int(12.5*8192))
  184. if _, err := crand.Reader.Read(src); err != nil {
  185. t.Fatal(err)
  186. }
  187. // Write it to the destination using a limited writer, with a wrapper to
  188. // count the write calls. The defaults on the limited writer should mean
  189. // it is used (and doesn't take the fast path). In practice the limiter
  190. // won't delay the test as the burst size is large enough to accommodate
  191. // regardless of the rate.
  192. dst := new(bytes.Buffer)
  193. cw := &countingWriter{w: dst}
  194. lw := &limitedWriter{
  195. writer: cw,
  196. waiterHolder: waiterHolder{
  197. waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  198. limitsLAN: new(atomic.Bool),
  199. isLAN: false, // enables limiting
  200. },
  201. }
  202. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  203. t.Fatal(err)
  204. }
  205. // Verify there were lots of writes (we expect one kilobyte write size
  206. // for the very low rate in this test) and that the end result is
  207. // identical.
  208. if cw.writeCount < 10*8 {
  209. t.Error("expected lots of smaller writes")
  210. }
  211. if cw.writeCount > 15*8 {
  212. t.Error("expected fewer larger writes")
  213. }
  214. if !bytes.Equal(src, dst.Bytes()) {
  215. t.Error("results should be equal")
  216. }
  217. // Write it to the destination using a limited writer, with a wrapper to
  218. // count the write calls. Now we make sure the fast path is used.
  219. dst = new(bytes.Buffer)
  220. cw = &countingWriter{w: dst}
  221. lw = &limitedWriter{
  222. writer: cw,
  223. waiterHolder: waiterHolder{
  224. waiter: rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  225. limitsLAN: new(atomic.Bool),
  226. isLAN: true, // disables limiting
  227. },
  228. }
  229. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  230. t.Fatal(err)
  231. }
  232. // Verify there were a single write and that the end result is identical.
  233. if cw.writeCount != 1 {
  234. t.Error("expected just the one write")
  235. }
  236. if !bytes.Equal(src, dst.Bytes()) {
  237. t.Error("results should be equal")
  238. }
  239. // Once more, but making sure the fast path is used for an unlimited
  240. // rate, with multiple unlimited raters even (global and per-device).
  241. dst = new(bytes.Buffer)
  242. cw = &countingWriter{w: dst}
  243. lw = &limitedWriter{
  244. writer: cw,
  245. waiterHolder: waiterHolder{
  246. waiter: totalWaiter{rate.NewLimiter(rate.Inf, limiterBurstSize), rate.NewLimiter(rate.Inf, limiterBurstSize)},
  247. limitsLAN: new(atomic.Bool),
  248. isLAN: false, // enables limiting
  249. },
  250. }
  251. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  252. t.Fatal(err)
  253. }
  254. // Verify there were a single write and that the end result is identical.
  255. if cw.writeCount != 1 {
  256. t.Error("expected just the one write")
  257. }
  258. if !bytes.Equal(src, dst.Bytes()) {
  259. t.Error("results should be equal")
  260. }
  261. // Once more, but making sure we *don't* take the fast path when there
  262. // is a combo of limited and unlimited writers.
  263. dst = new(bytes.Buffer)
  264. cw = &countingWriter{w: dst}
  265. lw = &limitedWriter{
  266. writer: cw,
  267. waiterHolder: waiterHolder{
  268. waiter: totalWaiter{
  269. rate.NewLimiter(rate.Inf, limiterBurstSize),
  270. rate.NewLimiter(rate.Limit(42), limiterBurstSize),
  271. rate.NewLimiter(rate.Inf, limiterBurstSize),
  272. },
  273. limitsLAN: new(atomic.Bool),
  274. isLAN: false, // enables limiting
  275. },
  276. }
  277. if _, err := io.Copy(lw, bytes.NewReader(src)); err != nil {
  278. t.Fatal(err)
  279. }
  280. // Verify there were lots of writes and that the end result is identical.
  281. if cw.writeCount < 10*8 {
  282. t.Error("expected lots of smaller writes")
  283. }
  284. if cw.writeCount > 15*8 {
  285. t.Error("expected fewer larger writes")
  286. }
  287. if !bytes.Equal(src, dst.Bytes()) {
  288. t.Error("results should be equal")
  289. }
  290. }
  291. func TestTotalWaiterLimit(t *testing.T) {
  292. cases := []struct {
  293. w waiter
  294. r rate.Limit
  295. }{
  296. {
  297. totalWaiter{},
  298. rate.Inf,
  299. },
  300. {
  301. totalWaiter{rate.NewLimiter(rate.Inf, 42)},
  302. rate.Inf,
  303. },
  304. {
  305. totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Inf, 42)},
  306. rate.Inf,
  307. },
  308. {
  309. totalWaiter{rate.NewLimiter(rate.Inf, 42), rate.NewLimiter(rate.Limit(12), 42), rate.NewLimiter(rate.Limit(15), 42)},
  310. rate.Limit(12),
  311. },
  312. }
  313. for _, tc := range cases {
  314. l := tc.w.Limit()
  315. if l != tc.r {
  316. t.Error("incorrect limit returned")
  317. }
  318. }
  319. }
  320. func checkActualAndExpected(t *testing.T, actualR, actualW, expectedR, expectedW map[protocol.DeviceID]*rate.Limiter) {
  321. t.Helper()
  322. if len(expectedW) != len(actualW) || len(expectedR) != len(actualR) {
  323. t.Errorf("Map lengths differ!")
  324. }
  325. for key, val := range expectedR {
  326. if _, ok := actualR[key]; !ok {
  327. t.Errorf("Device %s not found in limiter", key)
  328. }
  329. if val.Limit() != actualR[key].Limit() {
  330. t.Errorf("Read limits for device %s differ actual: %f, expected: %f", key, actualR[key].Limit(), val.Limit())
  331. }
  332. if expectedW[key].Limit() != actualW[key].Limit() {
  333. t.Errorf("Write limits for device %s differ actual: %f, expected: %f", key, actualW[key].Limit(), expectedW[key].Limit())
  334. }
  335. }
  336. }
  337. type countingWriter struct {
  338. w io.Writer
  339. writeCount int
  340. }
  341. func (w *countingWriter) Write(data []byte) (int, error) {
  342. w.writeCount++
  343. return w.w.Write(data)
  344. }