aggregator_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package watchaggregator
  7. import (
  8. "context"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "testing"
  13. "time"
  14. "github.com/syncthing/syncthing/lib/build"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/fs"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. )
  20. func TestMain(m *testing.M) {
  21. maxFiles = 32
  22. maxFilesPerDir = 8
  23. ret := m.Run()
  24. maxFiles = 512
  25. maxFilesPerDir = 128
  26. os.Exit(ret)
  27. }
  28. const (
  29. testNotifyDelayS = 1
  30. testNotifyTimeout = 2 * time.Second
  31. timeoutWithinBatch = time.Second
  32. )
  33. var (
  34. folderRoot = filepath.Clean("/home/someuser/syncthing")
  35. defaultFolderCfg = config.FolderConfiguration{
  36. FilesystemType: config.FilesystemTypeBasic,
  37. Path: folderRoot,
  38. FSWatcherDelayS: testNotifyDelayS,
  39. }
  40. defaultCfg = config.Wrap("", config.Configuration{
  41. Folders: []config.FolderConfiguration{defaultFolderCfg},
  42. }, protocol.LocalDeviceID, events.NoopLogger)
  43. )
  44. // Represents possibly multiple (different event types) expected paths from
  45. // aggregation, that should be received back to back.
  46. type expectedBatch struct {
  47. paths [][]string
  48. afterMs int
  49. beforeMs int
  50. }
  51. // TestAggregate checks whether maxFilesPerDir+1 events in one dir are
  52. // aggregated to parent dir
  53. func TestAggregate(t *testing.T) {
  54. inProgress := make(map[string]struct{})
  55. folderCfg := defaultFolderCfg.Copy()
  56. folderCfg.ID = "Aggregate"
  57. ctx, cancel := context.WithCancel(context.Background())
  58. defer cancel()
  59. a := newAggregator(ctx, folderCfg)
  60. // checks whether maxFilesPerDir events in one dir are kept as is
  61. for i := 0; i < maxFilesPerDir; i++ {
  62. a.newEvent(fs.Event{
  63. Name: filepath.Join("parent", strconv.Itoa(i)),
  64. Type: fs.NonRemove,
  65. }, inProgress)
  66. }
  67. if l := len(getEventPaths(a.root, ".", a)); l != maxFilesPerDir {
  68. t.Errorf("Unexpected number of events stored, got %v, expected %v", l, maxFilesPerDir)
  69. }
  70. // checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
  71. a.newEvent(fs.Event{
  72. Name: filepath.Join("parent", "new"),
  73. Type: fs.NonRemove,
  74. }, inProgress)
  75. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  76. // checks that adding an event below "parent" does not change anything
  77. a.newEvent(fs.Event{
  78. Name: filepath.Join("parent", "extra"),
  79. Type: fs.NonRemove,
  80. }, inProgress)
  81. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  82. // again test aggregation in "parent" but with event in subdirs
  83. a = newAggregator(ctx, folderCfg)
  84. for i := 0; i < maxFilesPerDir; i++ {
  85. a.newEvent(fs.Event{
  86. Name: filepath.Join("parent", strconv.Itoa(i)),
  87. Type: fs.NonRemove,
  88. }, inProgress)
  89. }
  90. a.newEvent(fs.Event{
  91. Name: filepath.Join("parent", "sub", "new"),
  92. Type: fs.NonRemove,
  93. }, inProgress)
  94. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  95. // test aggregation in root
  96. a = newAggregator(ctx, folderCfg)
  97. for i := 0; i < maxFiles; i++ {
  98. a.newEvent(fs.Event{
  99. Name: strconv.Itoa(i),
  100. Type: fs.NonRemove,
  101. }, inProgress)
  102. }
  103. if len(getEventPaths(a.root, ".", a)) != maxFiles {
  104. t.Errorf("Unexpected number of events stored in root")
  105. }
  106. a.newEvent(fs.Event{
  107. Name: filepath.Join("parent", "sub", "new"),
  108. Type: fs.NonRemove,
  109. }, inProgress)
  110. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  111. // checks that adding an event when "." is already stored is a noop
  112. a.newEvent(fs.Event{
  113. Name: "anythingelse",
  114. Type: fs.NonRemove,
  115. }, inProgress)
  116. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  117. a = newAggregator(ctx, folderCfg)
  118. filesPerDir := maxFilesPerDir / 2
  119. dirs := make([]string, maxFiles/filesPerDir+1)
  120. for i := 0; i < maxFiles/filesPerDir+1; i++ {
  121. dirs[i] = "dir" + strconv.Itoa(i)
  122. }
  123. for _, dir := range dirs {
  124. for i := 0; i < filesPerDir; i++ {
  125. a.newEvent(fs.Event{
  126. Name: filepath.Join(dir, strconv.Itoa(i)),
  127. Type: fs.NonRemove,
  128. }, inProgress)
  129. }
  130. }
  131. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  132. }
  133. // TestInProgress checks that ignoring files currently edited by Syncthing works
  134. func TestInProgress(t *testing.T) {
  135. evLogger := events.NewLogger()
  136. ctx, cancel := context.WithCancel(context.Background())
  137. go evLogger.Serve(ctx)
  138. defer cancel()
  139. testCase := func(c chan<- fs.Event) {
  140. evLogger.Log(events.ItemStarted, map[string]string{
  141. "item": "inprogress",
  142. })
  143. sleepMs(100)
  144. c <- fs.Event{Name: "inprogress", Type: fs.NonRemove}
  145. sleepMs(1000)
  146. evLogger.Log(events.ItemFinished, map[string]interface{}{
  147. "item": "inprogress",
  148. })
  149. sleepMs(100)
  150. c <- fs.Event{Name: "notinprogress", Type: fs.NonRemove}
  151. sleepMs(800)
  152. }
  153. expectedBatches := []expectedBatch{
  154. {[][]string{{"notinprogress"}}, 2000, 3500},
  155. }
  156. testScenario(t, "InProgress", testCase, expectedBatches, evLogger)
  157. }
  158. // TestDelay checks that recurring changes to the same path are delayed
  159. // and different types separated and ordered correctly
  160. func TestDelay(t *testing.T) {
  161. file := filepath.Join("parent", "file")
  162. delayed := "delayed"
  163. del := "deleted"
  164. delAfter := "deletedAfter"
  165. both := filepath.Join("parent", "sub", "both")
  166. testCase := func(c chan<- fs.Event) {
  167. sleepMs(200)
  168. c <- fs.Event{Name: file, Type: fs.NonRemove}
  169. delay := time.Duration(300) * time.Millisecond
  170. timer := time.NewTimer(delay)
  171. <-timer.C
  172. timer.Reset(delay)
  173. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  174. c <- fs.Event{Name: both, Type: fs.NonRemove}
  175. c <- fs.Event{Name: both, Type: fs.Remove}
  176. c <- fs.Event{Name: del, Type: fs.Remove}
  177. for i := 0; i < 9; i++ {
  178. <-timer.C
  179. timer.Reset(delay)
  180. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  181. }
  182. c <- fs.Event{Name: delAfter, Type: fs.Remove}
  183. <-timer.C
  184. }
  185. // batches that we expect to receive with time interval in milliseconds
  186. expectedBatches := []expectedBatch{
  187. {[][]string{{file}}, 500, 2500},
  188. {[][]string{{delayed}, {both}, {del}}, 2500, 4500},
  189. {[][]string{{delayed}, {delAfter}}, 3600, 7000},
  190. }
  191. testScenario(t, "Delay", testCase, expectedBatches, nil)
  192. }
  193. // TestNoDelay checks that no delay occurs if there are no non-remove events
  194. func TestNoDelay(t *testing.T) {
  195. mixed := "foo"
  196. del := "bar"
  197. testCase := func(c chan<- fs.Event) {
  198. c <- fs.Event{Name: mixed, Type: fs.NonRemove}
  199. c <- fs.Event{Name: mixed, Type: fs.Remove}
  200. c <- fs.Event{Name: del, Type: fs.Remove}
  201. }
  202. expectedBatches := []expectedBatch{
  203. {[][]string{{mixed}, {del}}, 500, 2000},
  204. }
  205. testScenario(t, "NoDelay", testCase, expectedBatches, nil)
  206. }
  207. func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
  208. var paths []string
  209. for childName, childDir := range dir.dirs {
  210. paths = append(paths, getEventPaths(childDir, filepath.Join(dirPath, childName), a)...)
  211. }
  212. for name := range dir.events {
  213. paths = append(paths, filepath.Join(dirPath, name))
  214. }
  215. return paths
  216. }
  217. func sleepMs(ms int) {
  218. time.Sleep(time.Duration(ms) * time.Millisecond)
  219. }
  220. func durationMs(ms int) time.Duration {
  221. return time.Duration(ms) * time.Millisecond
  222. }
  223. func compareBatchToExpected(batch []string, expectedPaths []string) (missing []string, unexpected []string) {
  224. for _, expected := range expectedPaths {
  225. expected = filepath.Clean(expected)
  226. found := false
  227. for i, received := range batch {
  228. if expected == received {
  229. found = true
  230. batch = append(batch[:i], batch[i+1:]...)
  231. break
  232. }
  233. }
  234. if !found {
  235. missing = append(missing, expected)
  236. }
  237. }
  238. unexpected = append(unexpected, batch...)
  239. return missing, unexpected
  240. }
  241. func compareBatchToExpectedDirect(t *testing.T, batch []string, expectedPaths []string) {
  242. t.Helper()
  243. missing, unexpected := compareBatchToExpected(batch, expectedPaths)
  244. for _, p := range missing {
  245. t.Errorf("Did not receive event %s", p)
  246. }
  247. for _, p := range unexpected {
  248. t.Errorf("Received unexpected event %s", p)
  249. }
  250. }
  251. func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch, evLogger events.Logger) {
  252. t.Helper()
  253. if evLogger == nil {
  254. evLogger = events.NoopLogger
  255. }
  256. ctx, cancel := context.WithCancel(context.Background())
  257. eventChan := make(chan fs.Event)
  258. watchChan := make(chan []string)
  259. folderCfg := defaultFolderCfg.Copy()
  260. folderCfg.ID = name
  261. a := newAggregator(ctx, folderCfg)
  262. a.notifyTimeout = testNotifyTimeout
  263. startTime := time.Now()
  264. go a.mainLoop(eventChan, watchChan, defaultCfg, evLogger)
  265. sleepMs(20)
  266. go testCase(eventChan)
  267. testAggregatorOutput(t, watchChan, expectedBatches, startTime)
  268. cancel()
  269. }
  270. func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) {
  271. t.Helper()
  272. var received []string
  273. var elapsedTime time.Duration
  274. var batchIndex, innerIndex int
  275. timeout := time.NewTimer(10 * time.Second)
  276. for {
  277. select {
  278. case <-timeout.C:
  279. t.Errorf("Timeout: Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
  280. return
  281. case received = <-fsWatchChan:
  282. }
  283. if batchIndex >= len(expectedBatches) {
  284. t.Errorf("Received batch %d, expected only %d", batchIndex+1, len(expectedBatches))
  285. continue
  286. }
  287. if !build.IsDarwin {
  288. now := time.Since(startTime)
  289. if innerIndex == 0 {
  290. switch {
  291. case now < durationMs(expectedBatches[batchIndex].afterMs):
  292. t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, now)
  293. case now > durationMs(expectedBatches[batchIndex].beforeMs):
  294. t.Errorf("Received batch %d after %v (too late)", batchIndex+1, now)
  295. }
  296. } else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch {
  297. t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime)
  298. }
  299. elapsedTime = now
  300. }
  301. expected := expectedBatches[batchIndex].paths[innerIndex]
  302. if len(received) != len(expected) {
  303. t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected), batchIndex+1)
  304. }
  305. missing, unexpected := compareBatchToExpected(received, expected)
  306. for _, p := range missing {
  307. t.Errorf("Did not receive event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
  308. }
  309. for _, p := range unexpected {
  310. t.Errorf("Received unexpected event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
  311. }
  312. if innerIndex == len(expectedBatches[batchIndex].paths)-1 {
  313. if batchIndex == len(expectedBatches)-1 {
  314. // received everything we expected to
  315. return
  316. }
  317. innerIndex = 0
  318. batchIndex++
  319. } else {
  320. innerIndex++
  321. }
  322. }
  323. }