sentdownloadstate.go 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "time"
  9. "github.com/syncthing/syncthing/lib/protocol"
  10. )
  11. // sentFolderFileDownloadState represents a state of what we've announced as available
  12. // to some remote device for a specific file.
  13. type sentFolderFileDownloadState struct {
  14. blockIndexes []int32
  15. version protocol.Vector
  16. updated time.Time
  17. created time.Time
  18. blockSize int
  19. }
  20. // sentFolderDownloadState represents a state of what we've announced as available
  21. // to some remote device for a specific folder.
  22. type sentFolderDownloadState struct {
  23. files map[string]*sentFolderFileDownloadState
  24. }
  25. // update takes a set of currently active sharedPullerStates, and returns a list
  26. // of updates which we need to send to the client to become up to date.
  27. func (s *sentFolderDownloadState) update(pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
  28. var name string
  29. var updates []protocol.FileDownloadProgressUpdate
  30. seen := make(map[string]struct{}, len(pullers))
  31. for _, puller := range pullers {
  32. name = puller.file.Name
  33. seen[name] = struct{}{}
  34. pullerBlockIndexes := puller.Available()
  35. pullerVersion := puller.file.Version
  36. pullerBlockIndexesUpdated := puller.AvailableUpdated()
  37. pullerCreated := puller.created
  38. pullerBlockSize := int32(puller.file.BlockSize())
  39. localFile, ok := s.files[name]
  40. // New file we haven't seen before
  41. if !ok {
  42. // Only send an update if the file actually has some blocks.
  43. if len(pullerBlockIndexes) > 0 {
  44. s.files[name] = &sentFolderFileDownloadState{
  45. blockIndexes: pullerBlockIndexes,
  46. updated: pullerBlockIndexesUpdated,
  47. version: pullerVersion,
  48. created: pullerCreated,
  49. blockSize: int(pullerBlockSize),
  50. }
  51. updates = append(updates, protocol.FileDownloadProgressUpdate{
  52. Name: name,
  53. Version: pullerVersion,
  54. UpdateType: protocol.UpdateTypeAppend,
  55. BlockIndexes: pullerBlockIndexes,
  56. BlockSize: pullerBlockSize,
  57. })
  58. }
  59. continue
  60. }
  61. // Existing file we've already sent an update for.
  62. if pullerBlockIndexesUpdated.Equal(localFile.updated) && pullerVersion.Equal(localFile.version) {
  63. // The file state hasn't changed, go to next.
  64. continue
  65. }
  66. if !pullerVersion.Equal(localFile.version) || !pullerCreated.Equal(localFile.created) {
  67. // The version has changed or the puller was reconstrcuted due to failure.
  68. // Clean up whatever we had for the old file, and advertise the new file.
  69. updates = append(updates, protocol.FileDownloadProgressUpdate{
  70. Name: name,
  71. Version: localFile.version,
  72. UpdateType: protocol.UpdateTypeForget,
  73. })
  74. updates = append(updates, protocol.FileDownloadProgressUpdate{
  75. Name: name,
  76. Version: pullerVersion,
  77. UpdateType: protocol.UpdateTypeAppend,
  78. BlockIndexes: pullerBlockIndexes,
  79. BlockSize: pullerBlockSize,
  80. })
  81. localFile.blockIndexes = pullerBlockIndexes
  82. localFile.updated = pullerBlockIndexesUpdated
  83. localFile.version = pullerVersion
  84. localFile.created = pullerCreated
  85. localFile.blockSize = int(pullerBlockSize)
  86. continue
  87. }
  88. // Relies on the fact that sharedPullerState.Available() should always
  89. // append.
  90. newBlocks := pullerBlockIndexes[len(localFile.blockIndexes):]
  91. localFile.blockIndexes = append(localFile.blockIndexes, newBlocks...)
  92. localFile.updated = pullerBlockIndexesUpdated
  93. // If there are new blocks, send the update.
  94. if len(newBlocks) > 0 {
  95. updates = append(updates, protocol.FileDownloadProgressUpdate{
  96. Name: name,
  97. Version: localFile.version,
  98. UpdateType: protocol.UpdateTypeAppend,
  99. BlockIndexes: newBlocks,
  100. BlockSize: pullerBlockSize,
  101. })
  102. }
  103. }
  104. // For each file that we are tracking, see if there still is a puller for it
  105. // if not, the file completed or errored out.
  106. for name, info := range s.files {
  107. _, ok := seen[name]
  108. if !ok {
  109. updates = append(updates, protocol.FileDownloadProgressUpdate{
  110. Name: name,
  111. Version: info.version,
  112. UpdateType: protocol.UpdateTypeForget,
  113. })
  114. delete(s.files, name)
  115. }
  116. }
  117. return updates
  118. }
  119. // destroy removes all stored state, and returns a set of updates we need to
  120. // dispatch to clean up the state on the remote end.
  121. func (s *sentFolderDownloadState) destroy() []protocol.FileDownloadProgressUpdate {
  122. updates := make([]protocol.FileDownloadProgressUpdate, 0, len(s.files))
  123. for name, info := range s.files {
  124. updates = append(updates, protocol.FileDownloadProgressUpdate{
  125. Name: name,
  126. Version: info.version,
  127. UpdateType: protocol.UpdateTypeForget,
  128. })
  129. delete(s.files, name)
  130. }
  131. return updates
  132. }
  133. // sentDownloadState represents a state of what we've announced as available
  134. // to some remote device. It is used from within the progress emitter
  135. // which only has one routine, hence is deemed threadsafe.
  136. type sentDownloadState struct {
  137. folderStates map[string]*sentFolderDownloadState
  138. }
  139. // update receives a folder, and a slice of pullers that are currently available
  140. // for the given folder, and according to the state of what we've seen before
  141. // returns a set of updates which we should send to the remote device to make
  142. // it aware of everything that we currently have available.
  143. func (s *sentDownloadState) update(folder string, pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
  144. fs, ok := s.folderStates[folder]
  145. if !ok {
  146. fs = &sentFolderDownloadState{
  147. files: make(map[string]*sentFolderFileDownloadState),
  148. }
  149. s.folderStates[folder] = fs
  150. }
  151. return fs.update(pullers)
  152. }
  153. // folders returns a set of folders this state is currently aware off.
  154. func (s *sentDownloadState) folders() []string {
  155. folders := make([]string, 0, len(s.folderStates))
  156. for key := range s.folderStates {
  157. folders = append(folders, key)
  158. }
  159. return folders
  160. }
  161. // cleanup cleans up all state related to a folder, and returns a set of updates
  162. // which would clean up the state on the remote device.
  163. func (s *sentDownloadState) cleanup(folder string) []protocol.FileDownloadProgressUpdate {
  164. fs, ok := s.folderStates[folder]
  165. if ok {
  166. updates := fs.destroy()
  167. delete(s.folderStates, folder)
  168. return updates
  169. }
  170. return nil
  171. }