sentdownloadstate.go 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "time"
  9. "github.com/syncthing/syncthing/lib/protocol"
  10. )
  11. // sentFolderFileDownloadState represents a state of what we've announced as available
  12. // to some remote device for a specific file.
  13. type sentFolderFileDownloadState struct {
  14. blockIndexes []int32
  15. version protocol.Vector
  16. updated time.Time
  17. }
  18. // sentFolderDownloadState represents a state of what we've announced as available
  19. // to some remote device for a specific folder.
  20. type sentFolderDownloadState struct {
  21. files map[string]*sentFolderFileDownloadState
  22. }
  23. // update takes a set of currently active sharedPullerStates, and returns a list
  24. // of updates which we need to send to the client to become up to date.
  25. func (s *sentFolderDownloadState) update(pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
  26. var name string
  27. var updates []protocol.FileDownloadProgressUpdate
  28. seen := make(map[string]struct{}, len(pullers))
  29. for _, puller := range pullers {
  30. name = puller.file.Name
  31. seen[name] = struct{}{}
  32. pullerBlockIndexes := puller.Available()
  33. pullerVersion := puller.file.Version
  34. pullerBlockIndexesUpdated := puller.AvailableUpdated()
  35. localFile, ok := s.files[name]
  36. // New file we haven't seen before
  37. if !ok {
  38. // Only send an update if the file actually has some blocks.
  39. if len(pullerBlockIndexes) > 0 {
  40. s.files[name] = &sentFolderFileDownloadState{
  41. blockIndexes: pullerBlockIndexes,
  42. updated: pullerBlockIndexesUpdated,
  43. version: pullerVersion,
  44. }
  45. updates = append(updates, protocol.FileDownloadProgressUpdate{
  46. Name: name,
  47. Version: pullerVersion,
  48. UpdateType: protocol.UpdateTypeAppend,
  49. BlockIndexes: pullerBlockIndexes,
  50. })
  51. }
  52. continue
  53. }
  54. // Existing file we've already sent an update for.
  55. if pullerBlockIndexesUpdated.Equal(localFile.updated) && pullerVersion.Equal(localFile.version) {
  56. // The file state hasn't changed, go to next.
  57. continue
  58. }
  59. if !pullerVersion.Equal(localFile.version) {
  60. // The version has changed, clean up whatever we had for the old
  61. // file, and advertise the new file.
  62. updates = append(updates, protocol.FileDownloadProgressUpdate{
  63. Name: name,
  64. Version: localFile.version,
  65. UpdateType: protocol.UpdateTypeForget,
  66. })
  67. updates = append(updates, protocol.FileDownloadProgressUpdate{
  68. Name: name,
  69. Version: pullerVersion,
  70. UpdateType: protocol.UpdateTypeAppend,
  71. BlockIndexes: pullerBlockIndexes,
  72. })
  73. localFile.blockIndexes = pullerBlockIndexes
  74. localFile.updated = pullerBlockIndexesUpdated
  75. localFile.version = pullerVersion
  76. continue
  77. }
  78. // Relies on the fact that sharedPullerState.Available() should always
  79. // append.
  80. newBlocks := pullerBlockIndexes[len(localFile.blockIndexes):]
  81. localFile.blockIndexes = append(localFile.blockIndexes, newBlocks...)
  82. localFile.updated = pullerBlockIndexesUpdated
  83. // If there are new blocks, send the update.
  84. if len(newBlocks) > 0 {
  85. updates = append(updates, protocol.FileDownloadProgressUpdate{
  86. Name: name,
  87. Version: localFile.version,
  88. UpdateType: protocol.UpdateTypeAppend,
  89. BlockIndexes: newBlocks,
  90. })
  91. }
  92. }
  93. // For each file that we are tracking, see if there still is a puller for it
  94. // if not, the file completed or errored out.
  95. for name, info := range s.files {
  96. _, ok := seen[name]
  97. if !ok {
  98. updates = append(updates, protocol.FileDownloadProgressUpdate{
  99. Name: name,
  100. Version: info.version,
  101. UpdateType: protocol.UpdateTypeForget,
  102. })
  103. delete(s.files, name)
  104. }
  105. }
  106. return updates
  107. }
  108. // destroy removes all stored state, and returns a set of updates we need to
  109. // dispatch to clean up the state on the remote end.
  110. func (s *sentFolderDownloadState) destroy() []protocol.FileDownloadProgressUpdate {
  111. updates := make([]protocol.FileDownloadProgressUpdate, 0, len(s.files))
  112. for name, info := range s.files {
  113. updates = append(updates, protocol.FileDownloadProgressUpdate{
  114. Name: name,
  115. Version: info.version,
  116. UpdateType: protocol.UpdateTypeForget,
  117. })
  118. delete(s.files, name)
  119. }
  120. return updates
  121. }
  122. // sentDownloadState represents a state of what we've announced as available
  123. // to some remote device. It is used from within the progress emitter
  124. // which only has one routine, hence is deemed threadsafe.
  125. type sentDownloadState struct {
  126. folderStates map[string]*sentFolderDownloadState
  127. }
  128. // update receives a folder, and a slice of pullers that are currently available
  129. // for the given folder, and according to the state of what we've seen before
  130. // returns a set of updates which we should send to the remote device to make
  131. // it aware of everything that we currently have available.
  132. func (s *sentDownloadState) update(folder string, pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
  133. fs, ok := s.folderStates[folder]
  134. if !ok {
  135. fs = &sentFolderDownloadState{
  136. files: make(map[string]*sentFolderFileDownloadState),
  137. }
  138. s.folderStates[folder] = fs
  139. }
  140. return fs.update(pullers)
  141. }
  142. // folders returns a set of folders this state is currently aware off.
  143. func (s *sentDownloadState) folders() []string {
  144. folders := make([]string, 0, len(s.folderStates))
  145. for key := range s.folderStates {
  146. folders = append(folders, key)
  147. }
  148. return folders
  149. }
  150. // cleanup cleans up all state related to a folder, and returns a set of updates
  151. // which would clean up the state on the remote device.
  152. func (s *sentDownloadState) cleanup(folder string) []protocol.FileDownloadProgressUpdate {
  153. fs, ok := s.folderStates[folder]
  154. if ok {
  155. updates := fs.destroy()
  156. delete(s.folderStates, folder)
  157. return updates
  158. }
  159. return nil
  160. }