aggregator_test.go 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package watchaggregator
  7. import (
  8. "context"
  9. "os"
  10. "path/filepath"
  11. "runtime"
  12. "strconv"
  13. "testing"
  14. "time"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/fs"
  18. )
  19. func TestMain(m *testing.M) {
  20. maxFiles = 32
  21. maxFilesPerDir = 8
  22. defer func() {
  23. maxFiles = 512
  24. maxFilesPerDir = 128
  25. }()
  26. os.Exit(m.Run())
  27. }
  28. const (
  29. testNotifyDelayS = 1
  30. testNotifyTimeout = 2 * time.Second
  31. )
  32. var (
  33. folderRoot = filepath.Clean("/home/someuser/syncthing")
  34. defaultFolderCfg = config.FolderConfiguration{
  35. FilesystemType: fs.FilesystemTypeBasic,
  36. Path: folderRoot,
  37. FSWatcherDelayS: testNotifyDelayS,
  38. }
  39. defaultCfg = config.Wrap("", config.Configuration{
  40. Folders: []config.FolderConfiguration{defaultFolderCfg},
  41. })
  42. )
  43. type expectedBatch struct {
  44. paths []string
  45. afterMs int
  46. beforeMs int
  47. }
  48. // TestAggregate checks whether maxFilesPerDir+1 events in one dir are
  49. // aggregated to parent dir
  50. func TestAggregate(t *testing.T) {
  51. evDir := newEventDir()
  52. inProgress := make(map[string]struct{})
  53. folderCfg := defaultFolderCfg.Copy()
  54. folderCfg.ID = "Aggregate"
  55. ctx, _ := context.WithCancel(context.Background())
  56. a := newAggregator(folderCfg, ctx)
  57. // checks whether maxFilesPerDir events in one dir are kept as is
  58. for i := 0; i < maxFilesPerDir; i++ {
  59. a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
  60. }
  61. if len(getEventPaths(evDir, ".", a)) != maxFilesPerDir {
  62. t.Errorf("Unexpected number of events stored")
  63. }
  64. // checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
  65. a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, evDir, inProgress)
  66. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
  67. // checks that adding an event below "parent" does not change anything
  68. a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, evDir, inProgress)
  69. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
  70. // again test aggregation in "parent" but with event in subdirs
  71. evDir = newEventDir()
  72. for i := 0; i < maxFilesPerDir; i++ {
  73. a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
  74. }
  75. a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
  76. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
  77. // test aggregation in root
  78. evDir = newEventDir()
  79. for i := 0; i < maxFiles; i++ {
  80. a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, evDir, inProgress)
  81. }
  82. if len(getEventPaths(evDir, ".", a)) != maxFiles {
  83. t.Errorf("Unexpected number of events stored in root")
  84. }
  85. a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
  86. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
  87. // checks that adding an event when "." is already stored is a noop
  88. a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, evDir, inProgress)
  89. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
  90. // TestOverflow checks that the entire folder is scanned when maxFiles is reached
  91. evDir = newEventDir()
  92. filesPerDir := maxFilesPerDir / 2
  93. dirs := make([]string, maxFiles/filesPerDir+1)
  94. for i := 0; i < maxFiles/filesPerDir+1; i++ {
  95. dirs[i] = "dir" + strconv.Itoa(i)
  96. }
  97. for _, dir := range dirs {
  98. for i := 0; i < filesPerDir; i++ {
  99. a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
  100. }
  101. }
  102. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
  103. }
  104. // TestInProgress checks that ignoring files currently edited by Syncthing works
  105. func TestInProgress(t *testing.T) {
  106. testCase := func(c chan<- fs.Event) {
  107. events.Default.Log(events.ItemStarted, map[string]string{
  108. "item": "inprogress",
  109. })
  110. sleepMs(100)
  111. c <- fs.Event{Name: "inprogress", Type: fs.NonRemove}
  112. sleepMs(1000)
  113. events.Default.Log(events.ItemFinished, map[string]interface{}{
  114. "item": "inprogress",
  115. })
  116. sleepMs(100)
  117. c <- fs.Event{Name: "notinprogress", Type: fs.NonRemove}
  118. sleepMs(800)
  119. }
  120. expectedBatches := []expectedBatch{
  121. {[]string{"notinprogress"}, 2000, 3500},
  122. }
  123. testScenario(t, "InProgress", testCase, expectedBatches)
  124. }
  125. // TestDelay checks that recurring changes to the same path are delayed
  126. // and different types separated and ordered correctly
  127. func TestDelay(t *testing.T) {
  128. file := filepath.Join("parent", "file")
  129. delayed := "delayed"
  130. del := "deleted"
  131. both := filepath.Join("parent", "sub", "both")
  132. testCase := func(c chan<- fs.Event) {
  133. sleepMs(200)
  134. c <- fs.Event{Name: file, Type: fs.NonRemove}
  135. delay := time.Duration(300) * time.Millisecond
  136. timer := time.NewTimer(delay)
  137. <-timer.C
  138. timer.Reset(delay)
  139. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  140. c <- fs.Event{Name: both, Type: fs.NonRemove}
  141. c <- fs.Event{Name: both, Type: fs.Remove}
  142. c <- fs.Event{Name: del, Type: fs.Remove}
  143. for i := 0; i < 9; i++ {
  144. <-timer.C
  145. timer.Reset(delay)
  146. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  147. }
  148. <-timer.C
  149. }
  150. // batches that we expect to receive with time interval in milliseconds
  151. expectedBatches := []expectedBatch{
  152. {[]string{file}, 500, 2500},
  153. {[]string{delayed}, 2500, 4500},
  154. {[]string{both}, 2500, 4500},
  155. {[]string{del}, 2500, 4500},
  156. {[]string{delayed}, 3600, 7000},
  157. }
  158. testScenario(t, "Delay", testCase, expectedBatches)
  159. }
  160. func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
  161. var paths []string
  162. for childName, childDir := range dir.dirs {
  163. for _, path := range getEventPaths(childDir, filepath.Join(dirPath, childName), a) {
  164. paths = append(paths, path)
  165. }
  166. }
  167. for name := range dir.events {
  168. paths = append(paths, filepath.Join(dirPath, name))
  169. }
  170. return paths
  171. }
  172. func sleepMs(ms int) {
  173. time.Sleep(time.Duration(ms) * time.Millisecond)
  174. }
  175. func durationMs(ms int) time.Duration {
  176. return time.Duration(ms) * time.Millisecond
  177. }
  178. func compareBatchToExpected(t *testing.T, batch []string, expectedPaths []string) {
  179. for _, expected := range expectedPaths {
  180. expected = filepath.Clean(expected)
  181. found := false
  182. for i, received := range batch {
  183. if expected == received {
  184. found = true
  185. batch = append(batch[:i], batch[i+1:]...)
  186. break
  187. }
  188. }
  189. if !found {
  190. t.Errorf("Did not receive event %s", expected)
  191. }
  192. }
  193. for _, received := range batch {
  194. t.Errorf("Received unexpected event %s", received)
  195. }
  196. }
  197. func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) {
  198. ctx, cancel := context.WithCancel(context.Background())
  199. eventChan := make(chan fs.Event)
  200. watchChan := make(chan []string)
  201. folderCfg := defaultFolderCfg.Copy()
  202. folderCfg.ID = name
  203. a := newAggregator(folderCfg, ctx)
  204. a.notifyTimeout = testNotifyTimeout
  205. startTime := time.Now()
  206. go a.mainLoop(eventChan, watchChan, defaultCfg)
  207. sleepMs(20)
  208. go testCase(eventChan)
  209. testAggregatorOutput(t, watchChan, expectedBatches, startTime)
  210. cancel()
  211. }
  212. func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) {
  213. var received []string
  214. var elapsedTime time.Duration
  215. batchIndex := 0
  216. timeout := time.NewTimer(10 * time.Second)
  217. for {
  218. select {
  219. case <-timeout.C:
  220. t.Errorf("Timeout: Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
  221. return
  222. case received = <-fsWatchChan:
  223. }
  224. elapsedTime = time.Since(startTime)
  225. expected := expectedBatches[batchIndex]
  226. if runtime.GOOS != "darwin" {
  227. switch {
  228. case elapsedTime < durationMs(expected.afterMs):
  229. t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
  230. case elapsedTime > durationMs(expected.beforeMs):
  231. t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
  232. }
  233. }
  234. if len(received) != len(expected.paths) {
  235. t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected.paths), batchIndex+1)
  236. }
  237. compareBatchToExpected(t, received, expected.paths)
  238. batchIndex++
  239. if batchIndex == len(expectedBatches) {
  240. // received everything we expected to
  241. return
  242. }
  243. }
  244. }