aggregator_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package watchaggregator
  7. import (
  8. "context"
  9. "os"
  10. "path/filepath"
  11. "runtime"
  12. "strconv"
  13. "testing"
  14. "time"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/fs"
  18. )
  19. func TestMain(m *testing.M) {
  20. maxFiles = 32
  21. maxFilesPerDir = 8
  22. ret := m.Run()
  23. maxFiles = 512
  24. maxFilesPerDir = 128
  25. os.Exit(ret)
  26. }
  27. const (
  28. testNotifyDelayS = 1
  29. testNotifyTimeout = 2 * time.Second
  30. timeoutWithinBatch = time.Second
  31. )
  32. var (
  33. folderRoot = filepath.Clean("/home/someuser/syncthing")
  34. defaultFolderCfg = config.FolderConfiguration{
  35. FilesystemType: fs.FilesystemTypeBasic,
  36. Path: folderRoot,
  37. FSWatcherDelayS: testNotifyDelayS,
  38. }
  39. defaultCfg = config.Wrap("", config.Configuration{
  40. Folders: []config.FolderConfiguration{defaultFolderCfg},
  41. })
  42. )
  43. // Represents possibly multiple (different event types) expected paths from
  44. // aggregation, that should be received back to back.
  45. type expectedBatch struct {
  46. paths [][]string
  47. afterMs int
  48. beforeMs int
  49. }
  50. // TestAggregate checks whether maxFilesPerDir+1 events in one dir are
  51. // aggregated to parent dir
  52. func TestAggregate(t *testing.T) {
  53. inProgress := make(map[string]struct{})
  54. folderCfg := defaultFolderCfg.Copy()
  55. folderCfg.ID = "Aggregate"
  56. ctx, _ := context.WithCancel(context.Background())
  57. a := newAggregator(folderCfg, ctx)
  58. // checks whether maxFilesPerDir events in one dir are kept as is
  59. for i := 0; i < maxFilesPerDir; i++ {
  60. a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress)
  61. }
  62. if l := len(getEventPaths(a.root, ".", a)); l != maxFilesPerDir {
  63. t.Errorf("Unexpected number of events stored, got %v, expected %v", l, maxFilesPerDir)
  64. }
  65. // checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
  66. a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, inProgress)
  67. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  68. // checks that adding an event below "parent" does not change anything
  69. a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, inProgress)
  70. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  71. // again test aggregation in "parent" but with event in subdirs
  72. a = newAggregator(folderCfg, ctx)
  73. for i := 0; i < maxFilesPerDir; i++ {
  74. a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, inProgress)
  75. }
  76. a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress)
  77. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  78. // test aggregation in root
  79. a = newAggregator(folderCfg, ctx)
  80. for i := 0; i < maxFiles; i++ {
  81. a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, inProgress)
  82. }
  83. if len(getEventPaths(a.root, ".", a)) != maxFiles {
  84. t.Errorf("Unexpected number of events stored in root")
  85. }
  86. a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, inProgress)
  87. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  88. // checks that adding an event when "." is already stored is a noop
  89. a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, inProgress)
  90. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  91. a = newAggregator(folderCfg, ctx)
  92. filesPerDir := maxFilesPerDir / 2
  93. dirs := make([]string, maxFiles/filesPerDir+1)
  94. for i := 0; i < maxFiles/filesPerDir+1; i++ {
  95. dirs[i] = "dir" + strconv.Itoa(i)
  96. }
  97. for _, dir := range dirs {
  98. for i := 0; i < filesPerDir; i++ {
  99. a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, inProgress)
  100. }
  101. }
  102. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  103. }
  104. // TestInProgress checks that ignoring files currently edited by Syncthing works
  105. func TestInProgress(t *testing.T) {
  106. testCase := func(c chan<- fs.Event) {
  107. events.Default.Log(events.ItemStarted, map[string]string{
  108. "item": "inprogress",
  109. })
  110. sleepMs(100)
  111. c <- fs.Event{Name: "inprogress", Type: fs.NonRemove}
  112. sleepMs(1000)
  113. events.Default.Log(events.ItemFinished, map[string]interface{}{
  114. "item": "inprogress",
  115. })
  116. sleepMs(100)
  117. c <- fs.Event{Name: "notinprogress", Type: fs.NonRemove}
  118. sleepMs(800)
  119. }
  120. expectedBatches := []expectedBatch{
  121. {[][]string{{"notinprogress"}}, 2000, 3500},
  122. }
  123. testScenario(t, "InProgress", testCase, expectedBatches)
  124. }
  125. // TestDelay checks that recurring changes to the same path are delayed
  126. // and different types separated and ordered correctly
  127. func TestDelay(t *testing.T) {
  128. file := filepath.Join("parent", "file")
  129. delayed := "delayed"
  130. del := "deleted"
  131. delAfter := "deletedAfter"
  132. both := filepath.Join("parent", "sub", "both")
  133. testCase := func(c chan<- fs.Event) {
  134. sleepMs(200)
  135. c <- fs.Event{Name: file, Type: fs.NonRemove}
  136. delay := time.Duration(300) * time.Millisecond
  137. timer := time.NewTimer(delay)
  138. <-timer.C
  139. timer.Reset(delay)
  140. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  141. c <- fs.Event{Name: both, Type: fs.NonRemove}
  142. c <- fs.Event{Name: both, Type: fs.Remove}
  143. c <- fs.Event{Name: del, Type: fs.Remove}
  144. for i := 0; i < 9; i++ {
  145. <-timer.C
  146. timer.Reset(delay)
  147. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  148. }
  149. c <- fs.Event{Name: delAfter, Type: fs.Remove}
  150. <-timer.C
  151. }
  152. // batches that we expect to receive with time interval in milliseconds
  153. expectedBatches := []expectedBatch{
  154. {[][]string{{file}}, 500, 2500},
  155. {[][]string{{delayed}, {both}, {del}}, 2500, 4500},
  156. {[][]string{{delayed}, {delAfter}}, 3600, 7000},
  157. }
  158. testScenario(t, "Delay", testCase, expectedBatches)
  159. }
  160. // TestNoDelay checks that no delay occurs if there are no non-remove events
  161. func TestNoDelay(t *testing.T) {
  162. mixed := "foo"
  163. del := "bar"
  164. testCase := func(c chan<- fs.Event) {
  165. c <- fs.Event{Name: mixed, Type: fs.NonRemove}
  166. c <- fs.Event{Name: mixed, Type: fs.Remove}
  167. c <- fs.Event{Name: del, Type: fs.Remove}
  168. }
  169. expectedBatches := []expectedBatch{
  170. {[][]string{{mixed}, {del}}, 500, 2000},
  171. }
  172. testScenario(t, "NoDelay", testCase, expectedBatches)
  173. }
  174. func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
  175. var paths []string
  176. for childName, childDir := range dir.dirs {
  177. for _, path := range getEventPaths(childDir, filepath.Join(dirPath, childName), a) {
  178. paths = append(paths, path)
  179. }
  180. }
  181. for name := range dir.events {
  182. paths = append(paths, filepath.Join(dirPath, name))
  183. }
  184. return paths
  185. }
  186. func sleepMs(ms int) {
  187. time.Sleep(time.Duration(ms) * time.Millisecond)
  188. }
  189. func durationMs(ms int) time.Duration {
  190. return time.Duration(ms) * time.Millisecond
  191. }
  192. func compareBatchToExpected(batch []string, expectedPaths []string) (missing []string, unexpected []string) {
  193. for _, expected := range expectedPaths {
  194. expected = filepath.Clean(expected)
  195. found := false
  196. for i, received := range batch {
  197. if expected == received {
  198. found = true
  199. batch = append(batch[:i], batch[i+1:]...)
  200. break
  201. }
  202. }
  203. if !found {
  204. missing = append(missing, expected)
  205. }
  206. }
  207. for _, received := range batch {
  208. unexpected = append(unexpected, received)
  209. }
  210. return missing, unexpected
  211. }
  212. func compareBatchToExpectedDirect(t *testing.T, batch []string, expectedPaths []string) {
  213. t.Helper()
  214. missing, unexpected := compareBatchToExpected(batch, expectedPaths)
  215. for _, p := range missing {
  216. t.Errorf("Did not receive event %s", p)
  217. }
  218. for _, p := range unexpected {
  219. t.Errorf("Received unexpected event %s", p)
  220. }
  221. }
  222. func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) {
  223. t.Helper()
  224. ctx, cancel := context.WithCancel(context.Background())
  225. eventChan := make(chan fs.Event)
  226. watchChan := make(chan []string)
  227. folderCfg := defaultFolderCfg.Copy()
  228. folderCfg.ID = name
  229. a := newAggregator(folderCfg, ctx)
  230. a.notifyTimeout = testNotifyTimeout
  231. startTime := time.Now()
  232. go a.mainLoop(eventChan, watchChan, defaultCfg)
  233. sleepMs(20)
  234. go testCase(eventChan)
  235. testAggregatorOutput(t, watchChan, expectedBatches, startTime)
  236. cancel()
  237. }
  238. func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) {
  239. t.Helper()
  240. var received []string
  241. var elapsedTime time.Duration
  242. var batchIndex, innerIndex int
  243. timeout := time.NewTimer(10 * time.Second)
  244. for {
  245. select {
  246. case <-timeout.C:
  247. t.Errorf("Timeout: Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
  248. return
  249. case received = <-fsWatchChan:
  250. }
  251. if batchIndex >= len(expectedBatches) {
  252. t.Errorf("Received batch %d, expected only %d", batchIndex+1, len(expectedBatches))
  253. continue
  254. }
  255. if runtime.GOOS != "darwin" {
  256. now := time.Since(startTime)
  257. if innerIndex == 0 {
  258. switch {
  259. case now < durationMs(expectedBatches[batchIndex].afterMs):
  260. t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, now)
  261. case now > durationMs(expectedBatches[batchIndex].beforeMs):
  262. t.Errorf("Received batch %d after %v (too late)", batchIndex+1, now)
  263. }
  264. } else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch {
  265. t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime)
  266. }
  267. elapsedTime = now
  268. }
  269. expected := expectedBatches[batchIndex].paths[innerIndex]
  270. if len(received) != len(expected) {
  271. t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected), batchIndex+1)
  272. }
  273. missing, unexpected := compareBatchToExpected(received, expected)
  274. for _, p := range missing {
  275. t.Errorf("Did not receive event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
  276. }
  277. for _, p := range unexpected {
  278. t.Errorf("Received unexpected event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
  279. }
  280. if innerIndex == len(expectedBatches[batchIndex].paths)-1 {
  281. if batchIndex == len(expectedBatches)-1 {
  282. // received everything we expected to
  283. return
  284. }
  285. innerIndex = 0
  286. batchIndex++
  287. } else {
  288. innerIndex++
  289. }
  290. }
  291. }