aggregator_test.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. // Copyright (C) 2016 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package watchaggregator
  7. import (
  8. "context"
  9. "os"
  10. "path/filepath"
  11. "runtime"
  12. "strconv"
  13. "testing"
  14. "time"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/fs"
  18. )
  19. func TestMain(m *testing.M) {
  20. maxFiles = 32
  21. maxFilesPerDir = 8
  22. ret := m.Run()
  23. maxFiles = 512
  24. maxFilesPerDir = 128
  25. os.Exit(ret)
  26. }
  27. const (
  28. testNotifyDelayS = 1
  29. testNotifyTimeout = 2 * time.Second
  30. timeoutWithinBatch = time.Second
  31. )
  32. var (
  33. folderRoot = filepath.Clean("/home/someuser/syncthing")
  34. defaultFolderCfg = config.FolderConfiguration{
  35. FilesystemType: fs.FilesystemTypeBasic,
  36. Path: folderRoot,
  37. FSWatcherDelayS: testNotifyDelayS,
  38. }
  39. defaultCfg = config.Wrap("", config.Configuration{
  40. Folders: []config.FolderConfiguration{defaultFolderCfg},
  41. }, events.NoopLogger)
  42. )
  43. // Represents possibly multiple (different event types) expected paths from
  44. // aggregation, that should be received back to back.
  45. type expectedBatch struct {
  46. paths [][]string
  47. afterMs int
  48. beforeMs int
  49. }
  50. // TestAggregate checks whether maxFilesPerDir+1 events in one dir are
  51. // aggregated to parent dir
  52. func TestAggregate(t *testing.T) {
  53. inProgress := make(map[string]struct{})
  54. folderCfg := defaultFolderCfg.Copy()
  55. folderCfg.ID = "Aggregate"
  56. ctx, cancel := context.WithCancel(context.Background())
  57. defer cancel()
  58. a := newAggregator(folderCfg, ctx)
  59. // checks whether maxFilesPerDir events in one dir are kept as is
  60. for i := 0; i < maxFilesPerDir; i++ {
  61. a.newEvent(fs.Event{
  62. Name: filepath.Join("parent", strconv.Itoa(i)),
  63. Type: fs.NonRemove,
  64. }, inProgress)
  65. }
  66. if l := len(getEventPaths(a.root, ".", a)); l != maxFilesPerDir {
  67. t.Errorf("Unexpected number of events stored, got %v, expected %v", l, maxFilesPerDir)
  68. }
  69. // checks whether maxFilesPerDir+1 events in one dir are aggregated to parent dir
  70. a.newEvent(fs.Event{
  71. Name: filepath.Join("parent", "new"),
  72. Type: fs.NonRemove,
  73. }, inProgress)
  74. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  75. // checks that adding an event below "parent" does not change anything
  76. a.newEvent(fs.Event{
  77. Name: filepath.Join("parent", "extra"),
  78. Type: fs.NonRemove,
  79. }, inProgress)
  80. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  81. // again test aggregation in "parent" but with event in subdirs
  82. a = newAggregator(folderCfg, ctx)
  83. for i := 0; i < maxFilesPerDir; i++ {
  84. a.newEvent(fs.Event{
  85. Name: filepath.Join("parent", strconv.Itoa(i)),
  86. Type: fs.NonRemove,
  87. }, inProgress)
  88. }
  89. a.newEvent(fs.Event{
  90. Name: filepath.Join("parent", "sub", "new"),
  91. Type: fs.NonRemove,
  92. }, inProgress)
  93. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"parent"})
  94. // test aggregation in root
  95. a = newAggregator(folderCfg, ctx)
  96. for i := 0; i < maxFiles; i++ {
  97. a.newEvent(fs.Event{
  98. Name: strconv.Itoa(i),
  99. Type: fs.NonRemove,
  100. }, inProgress)
  101. }
  102. if len(getEventPaths(a.root, ".", a)) != maxFiles {
  103. t.Errorf("Unexpected number of events stored in root")
  104. }
  105. a.newEvent(fs.Event{
  106. Name: filepath.Join("parent", "sub", "new"),
  107. Type: fs.NonRemove,
  108. }, inProgress)
  109. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  110. // checks that adding an event when "." is already stored is a noop
  111. a.newEvent(fs.Event{
  112. Name: "anythingelse",
  113. Type: fs.NonRemove,
  114. }, inProgress)
  115. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  116. a = newAggregator(folderCfg, ctx)
  117. filesPerDir := maxFilesPerDir / 2
  118. dirs := make([]string, maxFiles/filesPerDir+1)
  119. for i := 0; i < maxFiles/filesPerDir+1; i++ {
  120. dirs[i] = "dir" + strconv.Itoa(i)
  121. }
  122. for _, dir := range dirs {
  123. for i := 0; i < filesPerDir; i++ {
  124. a.newEvent(fs.Event{
  125. Name: filepath.Join(dir, strconv.Itoa(i)),
  126. Type: fs.NonRemove,
  127. }, inProgress)
  128. }
  129. }
  130. compareBatchToExpectedDirect(t, getEventPaths(a.root, ".", a), []string{"."})
  131. }
  132. // TestInProgress checks that ignoring files currently edited by Syncthing works
  133. func TestInProgress(t *testing.T) {
  134. evLogger := events.NewLogger()
  135. go evLogger.Serve()
  136. defer evLogger.Stop()
  137. testCase := func(c chan<- fs.Event) {
  138. evLogger.Log(events.ItemStarted, map[string]string{
  139. "item": "inprogress",
  140. })
  141. sleepMs(100)
  142. c <- fs.Event{Name: "inprogress", Type: fs.NonRemove}
  143. sleepMs(1000)
  144. evLogger.Log(events.ItemFinished, map[string]interface{}{
  145. "item": "inprogress",
  146. })
  147. sleepMs(100)
  148. c <- fs.Event{Name: "notinprogress", Type: fs.NonRemove}
  149. sleepMs(800)
  150. }
  151. expectedBatches := []expectedBatch{
  152. {[][]string{{"notinprogress"}}, 2000, 3500},
  153. }
  154. testScenario(t, "InProgress", testCase, expectedBatches, evLogger)
  155. }
  156. // TestDelay checks that recurring changes to the same path are delayed
  157. // and different types separated and ordered correctly
  158. func TestDelay(t *testing.T) {
  159. file := filepath.Join("parent", "file")
  160. delayed := "delayed"
  161. del := "deleted"
  162. delAfter := "deletedAfter"
  163. both := filepath.Join("parent", "sub", "both")
  164. testCase := func(c chan<- fs.Event) {
  165. sleepMs(200)
  166. c <- fs.Event{Name: file, Type: fs.NonRemove}
  167. delay := time.Duration(300) * time.Millisecond
  168. timer := time.NewTimer(delay)
  169. <-timer.C
  170. timer.Reset(delay)
  171. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  172. c <- fs.Event{Name: both, Type: fs.NonRemove}
  173. c <- fs.Event{Name: both, Type: fs.Remove}
  174. c <- fs.Event{Name: del, Type: fs.Remove}
  175. for i := 0; i < 9; i++ {
  176. <-timer.C
  177. timer.Reset(delay)
  178. c <- fs.Event{Name: delayed, Type: fs.NonRemove}
  179. }
  180. c <- fs.Event{Name: delAfter, Type: fs.Remove}
  181. <-timer.C
  182. }
  183. // batches that we expect to receive with time interval in milliseconds
  184. expectedBatches := []expectedBatch{
  185. {[][]string{{file}}, 500, 2500},
  186. {[][]string{{delayed}, {both}, {del}}, 2500, 4500},
  187. {[][]string{{delayed}, {delAfter}}, 3600, 7000},
  188. }
  189. testScenario(t, "Delay", testCase, expectedBatches, nil)
  190. }
  191. // TestNoDelay checks that no delay occurs if there are no non-remove events
  192. func TestNoDelay(t *testing.T) {
  193. mixed := "foo"
  194. del := "bar"
  195. testCase := func(c chan<- fs.Event) {
  196. c <- fs.Event{Name: mixed, Type: fs.NonRemove}
  197. c <- fs.Event{Name: mixed, Type: fs.Remove}
  198. c <- fs.Event{Name: del, Type: fs.Remove}
  199. }
  200. expectedBatches := []expectedBatch{
  201. {[][]string{{mixed}, {del}}, 500, 2000},
  202. }
  203. testScenario(t, "NoDelay", testCase, expectedBatches, nil)
  204. }
  205. func getEventPaths(dir *eventDir, dirPath string, a *aggregator) []string {
  206. var paths []string
  207. for childName, childDir := range dir.dirs {
  208. paths = append(paths, getEventPaths(childDir, filepath.Join(dirPath, childName), a)...)
  209. }
  210. for name := range dir.events {
  211. paths = append(paths, filepath.Join(dirPath, name))
  212. }
  213. return paths
  214. }
  215. func sleepMs(ms int) {
  216. time.Sleep(time.Duration(ms) * time.Millisecond)
  217. }
  218. func durationMs(ms int) time.Duration {
  219. return time.Duration(ms) * time.Millisecond
  220. }
  221. func compareBatchToExpected(batch []string, expectedPaths []string) (missing []string, unexpected []string) {
  222. for _, expected := range expectedPaths {
  223. expected = filepath.Clean(expected)
  224. found := false
  225. for i, received := range batch {
  226. if expected == received {
  227. found = true
  228. batch = append(batch[:i], batch[i+1:]...)
  229. break
  230. }
  231. }
  232. if !found {
  233. missing = append(missing, expected)
  234. }
  235. }
  236. unexpected = append(unexpected, batch...)
  237. return missing, unexpected
  238. }
  239. func compareBatchToExpectedDirect(t *testing.T, batch []string, expectedPaths []string) {
  240. t.Helper()
  241. missing, unexpected := compareBatchToExpected(batch, expectedPaths)
  242. for _, p := range missing {
  243. t.Errorf("Did not receive event %s", p)
  244. }
  245. for _, p := range unexpected {
  246. t.Errorf("Received unexpected event %s", p)
  247. }
  248. }
  249. func testScenario(t *testing.T, name string, testCase func(c chan<- fs.Event), expectedBatches []expectedBatch, evLogger events.Logger) {
  250. t.Helper()
  251. if evLogger == nil {
  252. evLogger = events.NoopLogger
  253. }
  254. ctx, cancel := context.WithCancel(context.Background())
  255. eventChan := make(chan fs.Event)
  256. watchChan := make(chan []string)
  257. folderCfg := defaultFolderCfg.Copy()
  258. folderCfg.ID = name
  259. a := newAggregator(folderCfg, ctx)
  260. a.notifyTimeout = testNotifyTimeout
  261. startTime := time.Now()
  262. go a.mainLoop(eventChan, watchChan, defaultCfg, evLogger)
  263. sleepMs(20)
  264. go testCase(eventChan)
  265. testAggregatorOutput(t, watchChan, expectedBatches, startTime)
  266. cancel()
  267. }
  268. func testAggregatorOutput(t *testing.T, fsWatchChan <-chan []string, expectedBatches []expectedBatch, startTime time.Time) {
  269. t.Helper()
  270. var received []string
  271. var elapsedTime time.Duration
  272. var batchIndex, innerIndex int
  273. timeout := time.NewTimer(10 * time.Second)
  274. for {
  275. select {
  276. case <-timeout.C:
  277. t.Errorf("Timeout: Received only %d batches (%d expected)", batchIndex, len(expectedBatches))
  278. return
  279. case received = <-fsWatchChan:
  280. }
  281. if batchIndex >= len(expectedBatches) {
  282. t.Errorf("Received batch %d, expected only %d", batchIndex+1, len(expectedBatches))
  283. continue
  284. }
  285. if runtime.GOOS != "darwin" {
  286. now := time.Since(startTime)
  287. if innerIndex == 0 {
  288. switch {
  289. case now < durationMs(expectedBatches[batchIndex].afterMs):
  290. t.Errorf("Received batch %d after %v (too soon)", batchIndex+1, now)
  291. case now > durationMs(expectedBatches[batchIndex].beforeMs):
  292. t.Errorf("Received batch %d after %v (too late)", batchIndex+1, now)
  293. }
  294. } else if innerTime := now - elapsedTime; innerTime > timeoutWithinBatch {
  295. t.Errorf("Receive part %d of batch %d after %v (too late)", innerIndex+1, batchIndex+1, innerTime)
  296. }
  297. elapsedTime = now
  298. }
  299. expected := expectedBatches[batchIndex].paths[innerIndex]
  300. if len(received) != len(expected) {
  301. t.Errorf("Received %v events instead of %v for batch %v", len(received), len(expected), batchIndex+1)
  302. }
  303. missing, unexpected := compareBatchToExpected(received, expected)
  304. for _, p := range missing {
  305. t.Errorf("Did not receive event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
  306. }
  307. for _, p := range unexpected {
  308. t.Errorf("Received unexpected event %s in batch %d (%d)", p, batchIndex+1, innerIndex+1)
  309. }
  310. if innerIndex == len(expectedBatches[batchIndex].paths)-1 {
  311. if batchIndex == len(expectedBatches)-1 {
  312. // received everything we expected to
  313. return
  314. }
  315. innerIndex = 0
  316. batchIndex++
  317. } else {
  318. innerIndex++
  319. }
  320. }
  321. }