progressemitter_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "fmt"
  10. "os"
  11. "path/filepath"
  12. "runtime"
  13. "testing"
  14. "time"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. "github.com/syncthing/syncthing/lib/sync"
  19. )
  20. var timeout = 100 * time.Millisecond
  21. func caller(skip int) string {
  22. _, file, line, ok := runtime.Caller(skip + 1)
  23. if !ok {
  24. return "unknown"
  25. }
  26. return fmt.Sprintf("%s:%d", filepath.Base(file), line)
  27. }
  28. func expectEvent(w events.Subscription, t *testing.T, size int) {
  29. event, err := w.Poll(timeout)
  30. if err != nil {
  31. t.Fatal("Unexpected error:", err, "at", caller(1))
  32. }
  33. if event.Type != events.DownloadProgress {
  34. t.Fatal("Unexpected event:", event, "at", caller(1))
  35. }
  36. data := event.Data.(map[string]map[string]*pullerProgress)
  37. if len(data) != size {
  38. t.Fatal("Unexpected event data size:", data, "at", caller(1))
  39. }
  40. }
  41. func expectTimeout(w events.Subscription, t *testing.T) {
  42. _, err := w.Poll(timeout)
  43. if err != events.ErrTimeout {
  44. t.Fatal("Unexpected non-Timeout error:", err, "at", caller(1))
  45. }
  46. }
  47. func TestProgressEmitter(t *testing.T) {
  48. evLogger := events.NewLogger()
  49. go evLogger.Serve()
  50. defer evLogger.Stop()
  51. w := evLogger.Subscribe(events.DownloadProgress)
  52. c := createTmpWrapper(config.Configuration{})
  53. defer os.Remove(c.ConfigPath())
  54. c.SetOptions(config.OptionsConfiguration{
  55. ProgressUpdateIntervalS: 0,
  56. })
  57. p := NewProgressEmitter(c, evLogger)
  58. go p.Serve()
  59. defer p.Stop()
  60. p.interval = 0
  61. expectTimeout(w, t)
  62. s := sharedPullerState{
  63. updated: time.Now(),
  64. mut: sync.NewRWMutex(),
  65. }
  66. p.Register(&s)
  67. expectEvent(w, t, 1)
  68. expectTimeout(w, t)
  69. s.copyDone(protocol.BlockInfo{})
  70. expectEvent(w, t, 1)
  71. expectTimeout(w, t)
  72. s.copiedFromOrigin()
  73. expectEvent(w, t, 1)
  74. expectTimeout(w, t)
  75. s.pullStarted()
  76. expectEvent(w, t, 1)
  77. expectTimeout(w, t)
  78. s.pullDone(protocol.BlockInfo{})
  79. expectEvent(w, t, 1)
  80. expectTimeout(w, t)
  81. p.Deregister(&s)
  82. expectEvent(w, t, 0)
  83. expectTimeout(w, t)
  84. }
  85. func TestSendDownloadProgressMessages(t *testing.T) {
  86. c := createTmpWrapper(config.Configuration{})
  87. defer os.Remove(c.ConfigPath())
  88. c.SetOptions(config.OptionsConfiguration{
  89. ProgressUpdateIntervalS: 0,
  90. TempIndexMinBlocks: 10,
  91. })
  92. fc := &fakeConnection{}
  93. evLogger := events.NewLogger()
  94. go evLogger.Serve()
  95. defer evLogger.Stop()
  96. p := NewProgressEmitter(c, evLogger)
  97. p.temporaryIndexSubscribe(fc, []string{"folder", "folder2"})
  98. p.registry["folder"] = make(map[string]*sharedPullerState)
  99. p.registry["folder2"] = make(map[string]*sharedPullerState)
  100. p.registry["folderXXX"] = make(map[string]*sharedPullerState)
  101. expect := func(updateIdx int, state *sharedPullerState, updateType protocol.FileDownloadProgressUpdateType, version protocol.Vector, blocks []int32, remove bool) {
  102. messageIdx := -1
  103. for i, msg := range fc.downloadProgressMessages {
  104. if msg.folder == state.folder {
  105. messageIdx = i
  106. break
  107. }
  108. }
  109. if messageIdx < 0 {
  110. t.Errorf("Message for folder %s does not exist at %s", state.folder, caller(1))
  111. }
  112. msg := fc.downloadProgressMessages[messageIdx]
  113. // Don't know the index (it's random due to iterating maps)
  114. if updateIdx == -1 {
  115. for i, upd := range msg.updates {
  116. if upd.Name == state.file.Name {
  117. updateIdx = i
  118. break
  119. }
  120. }
  121. }
  122. if updateIdx == -1 {
  123. t.Errorf("Could not find update for %s at %s", state.file.Name, caller(1))
  124. }
  125. if updateIdx > len(msg.updates)-1 {
  126. t.Errorf("Update at index %d does not exist at %s", updateIdx, caller(1))
  127. }
  128. update := msg.updates[updateIdx]
  129. if update.UpdateType != updateType {
  130. t.Errorf("Wrong update type at %s", caller(1))
  131. }
  132. if !update.Version.Equal(version) {
  133. t.Errorf("Wrong version at %s", caller(1))
  134. }
  135. if len(update.BlockIndexes) != len(blocks) {
  136. t.Errorf("Wrong indexes. Have %d expect %d at %s", len(update.BlockIndexes), len(blocks), caller(1))
  137. }
  138. for i := range update.BlockIndexes {
  139. if update.BlockIndexes[i] != blocks[i] {
  140. t.Errorf("Index %d incorrect at %s", i, caller(1))
  141. }
  142. }
  143. if remove {
  144. fc.downloadProgressMessages = append(fc.downloadProgressMessages[:messageIdx], fc.downloadProgressMessages[messageIdx+1:]...)
  145. }
  146. }
  147. expectEmpty := func() {
  148. if len(fc.downloadProgressMessages) > 0 {
  149. t.Errorf("Still have something at %s: %#v", caller(1), fc.downloadProgressMessages)
  150. }
  151. }
  152. now := time.Now()
  153. tick := func() time.Time {
  154. now = now.Add(time.Second)
  155. return now
  156. }
  157. if len(fc.downloadProgressMessages) != 0 {
  158. t.Error("Expected no requests")
  159. }
  160. v1 := (protocol.Vector{}).Update(0)
  161. v2 := (protocol.Vector{}).Update(1)
  162. // Requires more than 10 blocks to work.
  163. blocks := make([]protocol.BlockInfo, 11)
  164. state1 := &sharedPullerState{
  165. folder: "folder",
  166. file: protocol.FileInfo{
  167. Name: "state1",
  168. Version: v1,
  169. Blocks: blocks,
  170. },
  171. mut: sync.NewRWMutex(),
  172. availableUpdated: time.Now(),
  173. }
  174. p.registry["folder"]["1"] = state1
  175. // Has no blocks, hence no message is sent
  176. sendMsgs(p)
  177. expectEmpty()
  178. // Returns update for puller with new extra blocks
  179. state1.available = []int32{1}
  180. sendMsgs(p)
  181. expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{1}, true)
  182. expectEmpty()
  183. // Does nothing if nothing changes
  184. sendMsgs(p)
  185. expectEmpty()
  186. // Does nothing if timestamp updated, but no new blocks (should never happen)
  187. state1.availableUpdated = tick()
  188. sendMsgs(p)
  189. expectEmpty()
  190. // Does not return an update if date blocks change but date does not (should never happen)
  191. state1.available = []int32{1, 2}
  192. sendMsgs(p)
  193. expectEmpty()
  194. // If the date and blocks changes, returns only the diff
  195. state1.availableUpdated = tick()
  196. sendMsgs(p)
  197. expect(0, state1, protocol.UpdateTypeAppend, v1, []int32{2}, true)
  198. expectEmpty()
  199. // Returns forget and update if puller version has changed
  200. state1.file.Version = v2
  201. sendMsgs(p)
  202. expect(0, state1, protocol.UpdateTypeForget, v1, nil, false)
  203. expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1, 2}, true)
  204. expectEmpty()
  205. // Returns forget and append if sharedPullerState creation timer changes.
  206. state1.available = []int32{1}
  207. state1.availableUpdated = tick()
  208. state1.created = tick()
  209. sendMsgs(p)
  210. expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
  211. expect(1, state1, protocol.UpdateTypeAppend, v2, []int32{1}, true)
  212. expectEmpty()
  213. // Sends an empty update if new file exists, but does not have any blocks yet. (To indicate that the old blocks are no longer available)
  214. state1.file.Version = v1
  215. state1.available = nil
  216. state1.availableUpdated = tick()
  217. sendMsgs(p)
  218. expect(0, state1, protocol.UpdateTypeForget, v2, nil, false)
  219. expect(1, state1, protocol.UpdateTypeAppend, v1, nil, true)
  220. expectEmpty()
  221. // Updates for multiple files and folders can be combined
  222. state1.available = []int32{1, 2, 3}
  223. state1.availableUpdated = tick()
  224. state2 := &sharedPullerState{
  225. folder: "folder2",
  226. file: protocol.FileInfo{
  227. Name: "state2",
  228. Version: v1,
  229. Blocks: blocks,
  230. },
  231. mut: sync.NewRWMutex(),
  232. available: []int32{1, 2, 3},
  233. availableUpdated: time.Now(),
  234. }
  235. state3 := &sharedPullerState{
  236. folder: "folder",
  237. file: protocol.FileInfo{
  238. Name: "state3",
  239. Version: v1,
  240. Blocks: blocks,
  241. },
  242. mut: sync.NewRWMutex(),
  243. available: []int32{1, 2, 3},
  244. availableUpdated: time.Now(),
  245. }
  246. state4 := &sharedPullerState{
  247. folder: "folder2",
  248. file: protocol.FileInfo{
  249. Name: "state4",
  250. Version: v1,
  251. Blocks: blocks,
  252. },
  253. mut: sync.NewRWMutex(),
  254. available: []int32{1, 2, 3},
  255. availableUpdated: time.Now(),
  256. }
  257. p.registry["folder2"]["2"] = state2
  258. p.registry["folder"]["3"] = state3
  259. p.registry["folder2"]["4"] = state4
  260. sendMsgs(p)
  261. expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
  262. expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
  263. expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, false)
  264. expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
  265. expectEmpty()
  266. // Returns forget if puller no longer exists, as well as updates if it has been updated.
  267. state1.available = []int32{1, 2, 3, 4, 5}
  268. state1.availableUpdated = tick()
  269. state2.available = []int32{1, 2, 3, 4, 5}
  270. state2.availableUpdated = tick()
  271. delete(p.registry["folder"], "3")
  272. delete(p.registry["folder2"], "4")
  273. sendMsgs(p)
  274. expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
  275. expect(-1, state3, protocol.UpdateTypeForget, v1, nil, true)
  276. expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{4, 5}, false)
  277. expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
  278. expectEmpty()
  279. // Deletions are sent only once (actual bug I found writing the tests)
  280. sendMsgs(p)
  281. sendMsgs(p)
  282. expectEmpty()
  283. // Not sent for "inactive" (symlinks, dirs, or wrong folder) pullers
  284. // Directory
  285. state5 := &sharedPullerState{
  286. folder: "folder",
  287. file: protocol.FileInfo{
  288. Name: "state5",
  289. Version: v1,
  290. Type: protocol.FileInfoTypeDirectory,
  291. Blocks: blocks,
  292. },
  293. mut: sync.NewRWMutex(),
  294. available: []int32{1, 2, 3},
  295. availableUpdated: time.Now(),
  296. }
  297. // Symlink
  298. state6 := &sharedPullerState{
  299. folder: "folder",
  300. file: protocol.FileInfo{
  301. Name: "state6",
  302. Version: v1,
  303. Type: protocol.FileInfoTypeSymlink,
  304. },
  305. mut: sync.NewRWMutex(),
  306. available: []int32{1, 2, 3},
  307. availableUpdated: time.Now(),
  308. }
  309. // Some other directory
  310. state7 := &sharedPullerState{
  311. folder: "folderXXX",
  312. file: protocol.FileInfo{
  313. Name: "state7",
  314. Version: v1,
  315. Blocks: blocks,
  316. },
  317. mut: sync.NewRWMutex(),
  318. available: []int32{1, 2, 3},
  319. availableUpdated: time.Now(),
  320. }
  321. // Less than 10 blocks
  322. state8 := &sharedPullerState{
  323. folder: "folder",
  324. file: protocol.FileInfo{
  325. Name: "state8",
  326. Version: v1,
  327. Blocks: blocks[:3],
  328. },
  329. mut: sync.NewRWMutex(),
  330. available: []int32{1, 2, 3},
  331. availableUpdated: time.Now(),
  332. }
  333. p.registry["folder"]["5"] = state5
  334. p.registry["folder"]["6"] = state6
  335. p.registry["folderXXX"]["7"] = state7
  336. p.registry["folder"]["8"] = state8
  337. sendMsgs(p)
  338. expectEmpty()
  339. // Device is no longer subscribed to a particular folder
  340. delete(p.registry["folder"], "1") // Clean up first
  341. delete(p.registry["folder2"], "2") // Clean up first
  342. sendMsgs(p)
  343. expect(-1, state1, protocol.UpdateTypeForget, v1, nil, true)
  344. expect(-1, state2, protocol.UpdateTypeForget, v1, nil, true)
  345. expectEmpty()
  346. p.registry["folder"]["1"] = state1
  347. p.registry["folder2"]["2"] = state2
  348. p.registry["folder"]["3"] = state3
  349. p.registry["folder2"]["4"] = state4
  350. sendMsgs(p)
  351. expect(-1, state1, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
  352. expect(-1, state3, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
  353. expect(-1, state2, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3, 4, 5}, false)
  354. expect(-1, state4, protocol.UpdateTypeAppend, v1, []int32{1, 2, 3}, true)
  355. expectEmpty()
  356. p.temporaryIndexUnsubscribe(fc)
  357. p.temporaryIndexSubscribe(fc, []string{"folder"})
  358. sendMsgs(p)
  359. // See progressemitter.go for explanation why this is commented out.
  360. // Search for state.cleanup
  361. //expect(-1, state2, protocol.UpdateTypeForget, v1, nil, false)
  362. //expect(-1, state4, protocol.UpdateTypeForget, v1, nil, true)
  363. expectEmpty()
  364. // Cleanup when device no longer exists
  365. p.temporaryIndexUnsubscribe(fc)
  366. sendMsgs(p)
  367. _, ok := p.sentDownloadStates[fc.ID()]
  368. if ok {
  369. t.Error("Should not be there")
  370. }
  371. }
  372. func sendMsgs(p *ProgressEmitter) {
  373. p.mut.Lock()
  374. defer p.mut.Unlock()
  375. p.sendDownloadProgressMessagesLocked(context.Background())
  376. }