aggregator_test.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package watchaggregator
  7. import (
  8. "context"
  9. "os"
  10. "path/filepath"
  11. "strconv"
  12. "testing"
  13. "time"
  14. "github.com/syncthing/syncthing/lib/config"
  15. "github.com/syncthing/syncthing/lib/events"
  16. "github.com/syncthing/syncthing/lib/fs"
  17. )
  18. func TestMain(m *testing.M) {
  19. maxFiles = 32
  20. maxFilesPerDir = 8
  21. defer func() {
  22. maxFiles = 512
  23. maxFilesPerDir = 128
  24. }()
  25. os.Exit(m.Run())
  26. }
  27. const (
  28. testNotifyDelayS = 1
  29. testNotifyTimeout = 2 * time.Second
  30. )
  31. var (
  32. folderRoot = filepath.Clean("/home/someuser/syncthing")
  33. defaultFolderCfg = config.FolderConfiguration{
  34. FilesystemType: fs.FilesystemTypeBasic,
  35. Path: folderRoot,
  36. FSWatcherDelayS: testNotifyDelayS,
  37. }
  38. defaultCfg = config.Wrap("", config.Configuration{
  39. Folders: []config.FolderConfiguration{defaultFolderCfg},
  40. })
  41. )
  42. type expectedBatch struct {
  43. paths []string
  44. afterMs int
  45. beforeMs int
  46. }
  47. // TestAggregate checks whether maxFilesPerDir+1 events in one dir are
  48. // aggregated to parent dir
  49. func TestAggregate(t *testing.T) {
  50. evDir := newEventDir()
  51. inProgress := make(map[string]struct{})
  52. folderCfg := defaultFolderCfg.Copy()
  53. folderCfg.ID = "Aggregate"
  54. ctx, _ := context.WithCancel(context.Background())
  55. a := new(folderCfg, ctx)
  56. // checks whether maxFilesPerDir events in one dir are kept as is
  57. for i := 0; i < maxFilesPerDir; i++ {
  58. a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
  59. }
  60. if len(getEventPaths(evDir, ".", a)) != maxFilesPerDir {
  61. t.Errorf("Unexpected number of events stored")
  62. }
  63. // checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
  64. a.newEvent(fs.Event{filepath.Join("parent", "new"), fs.NonRemove}, evDir, inProgress)
  65. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
  66. // checks that adding an event below "parent" does not change anything
  67. a.newEvent(fs.Event{filepath.Join("parent", "extra"), fs.NonRemove}, evDir, inProgress)
  68. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
  69. // again test aggregation in "parent" but with event in subdirs
  70. evDir = newEventDir()
  71. for i := 0; i < maxFilesPerDir; i++ {
  72. a.newEvent(fs.Event{filepath.Join("parent", strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
  73. }
  74. a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
  75. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"parent"})
  76. // test aggregation in root
  77. evDir = newEventDir()
  78. for i := 0; i < maxFiles; i++ {
  79. a.newEvent(fs.Event{strconv.Itoa(i), fs.NonRemove}, evDir, inProgress)
  80. }
  81. if len(getEventPaths(evDir, ".", a)) != maxFiles {
  82. t.Errorf("Unexpected number of events stored in root")
  83. }
  84. a.newEvent(fs.Event{filepath.Join("parent", "sub", "new"), fs.NonRemove}, evDir, inProgress)
  85. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
  86. // checks that adding an event when "." is already stored is a noop
  87. a.newEvent(fs.Event{"anythingelse", fs.NonRemove}, evDir, inProgress)
  88. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
  89. // TestOverflow checks that the entire folder is scanned when maxFiles is reached
  90. evDir = newEventDir()
  91. filesPerDir := maxFilesPerDir / 2
  92. dirs := make([]string, maxFiles/filesPerDir+1)
  93. for i := 0; i < maxFiles/filesPerDir+1; i++ {
  94. dirs[i] = "dir" + strconv.Itoa(i)
  95. }
  96. for _, dir := range dirs {
  97. for i := 0; i < filesPerDir; i++ {
  98. a.newEvent(fs.Event{filepath.Join(dir, strconv.Itoa(i)), fs.NonRemove}, evDir, inProgress)
  99. }
  100. }
  101. compareBatchToExpected(t, getEventPaths(evDir, ".", a), []string{"."})
  102. }
  103. // TestInProgress checks that ignoring files currently edited by Syncthing works
  104. func TestInProgress(t *testing.T) {
  105. testCase := func(c chan<- fs.Event) {
  106. events.Default.Log(events.ItemStarted, map[string]string{
  107. "item": "inprogress",
  108. })
  109. sleepMs(100)
  110. c <- fs.Event{Name: "inprogress", Type: fs.NonRemove}
  111. sleepMs(1000)
  112. events.Default.Log(events.ItemFinished, map[string]interface{}{
  113. "item": "inprogress",
  114. })
  115. sleepMs(100)
  116. c <- fs.Event{Name: "notinprogress", Type: fs.NonRemove}
  117. sleepMs(800)
  118. }
  119. expectedBatches := []expectedBatch{
  120. {[]string{"notinprogress"}, 2000, 3500},
  121. }
  122. testScenario(t, "InProgress", testCase, expectedBatches)
  123. }
  124. // TestDelay checks that recurring changes to the same path are delayed
  125. // and different types separated and ordered correctly
  126. func TestDelay(t *testing.T) {
  127. file := filepath.Join("parent", "file")
  128. delayed := "delayed"
  129. del := "deleted"
  130. both := filepath.Join("parent", "sub", "both")
  131. testCase := func(c chan<- fs.Event) {
  132. sleepMs(200)
  133. c <- fs.Event{Name: file, Type: fs.NonRemove}
  134. delay := time.Duration(300) * time.Millisecond
  135. timer := time.NewTimer(delay)
  136. <-timer.C
  137. timer.Reset(delay)
  138. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  139. c <- fs.Event{Name: both, Type: fs.NonRemove}
  140. c <- fs.Event{Name: both, Type: fs.Remove}
  141. c <- fs.Event{Name: del, Type: fs.Remove}
  142. for i := 0; i < 9; i++ {
  143. <-timer.C
  144. timer.Reset(delay)
  145. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  146. }
  147. <-timer.C
  148. }
  149. // batches that we expect to receive with time interval in milliseconds
  150. expectedBatches := []expectedBatch{
  151. {[]string{file}, 500, 2500},
  152. {[]string{delayed}, 2500, 4500},
  153. {[]string{both}, 2500, 4500},
  154. {[]string{del}, 2500, 4500},
  155. {[]string{delayed}, 3600, 6500},
  156. }
  157. testScenario(t, "Delay", testCase, expectedBatches)
  158. }
  159. func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
  160. var paths []string
  161. for childName, childDir := range dir.dirs {
  162. for _, path := range getEventPaths(childDir, filepath.Join(dirPath, childName), a) {
  163. paths = append(paths, path)
  164. }
  165. }
  166. for name := range dir.events {
  167. paths = append(paths, filepath.Join(dirPath, name))
  168. }
  169. return paths
  170. }
  171. func sleepMs(ms int) {
  172. time.Sleep(time.Duration(ms) * time.Millisecond)
  173. }
  174. func durationMs(ms int) time.Duration {
  175. return time.Duration(ms) * time.Millisecond
  176. }
  177. func compareBatchToExpected(t *testing.T, batch []string, expectedPaths []string) {
  178. for _, expected := range expectedPaths {
  179. expected = filepath.Clean(expected)
  180. found := false
  181. for i, received := range batch {
  182. if expected == received {
  183. found = true
  184. batch = append(batch[:i], batch[i+1:]...)
  185. break
  186. }
  187. }
  188. if !found {
  189. t.Errorf("Did not receive event %s", expected)
  190. }
  191. }
  192. for _, received := range batch {
  193. t.Errorf("Received unexpected event %s", received)
  194. }
  195. }
  196. func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch) {
  197. ctx, cancel := context.WithCancel(context.Background())
  198. eventChan := make(chan fs.Event)
  199. watchChan := make(chan []string)
  200. folderCfg := defaultFolderCfg.Copy()
  201. folderCfg.ID = name
  202. a := new(folderCfg, ctx)
  203. a.notifyTimeout = testNotifyTimeout
  204. startTime := time.Now()
  205. go a.mainLoop(eventChan, watchChan, defaultCfg)
  206. sleepMs(10)
  207. go testAggregatorOutput(t, watchChan, expectedBatches, startTime, ctx)
  208. testCase(eventChan)
  209. timeout := time.NewTimer(time.Duration(expectedBatches[len(expectedBatches)-1].beforeMs+100) * time.Millisecond)
  210. <-timeout.C
  211. cancel()
  212. }
  213. func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time, ctx context.Context) {
  214. var received []string
  215. var elapsedTime time.Duration
  216. batchIndex := 0
  217. for {
  218. select {
  219. case <-ctx.Done():
  220. if batchIndex != len(expectedBatches) {
  221. t.Errorf("Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
  222. }
  223. return
  224. case received = <-fsWatchChan:
  225. }
  226. if batchIndex >= len(expectedBatches) {
  227. t.Errorf("Received batch %d (only %d expected)", batchIndex+1, len(expectedBatches))
  228. continue
  229. }
  230. elapsedTime = time.Since(startTime)
  231. expected := expectedBatches[batchIndex]
  232. switch {
  233. case elapsedTime < durationMs(expected.afterMs):
  234. t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, elapsedTime)
  235. case elapsedTime > durationMs(expected.beforeMs):
  236. t.Errorf("Received batch %d after %v (too late)", batchIndex+1, elapsedTime)
  237. case len(received) != len(expected.paths):
  238. t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected.paths), batchIndex+1)
  239. }
  240. compareBatchToExpected(t, received, expected.paths)
  241. batchIndex++
  242. }
  243. }