sentdownloadstate.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "time"
  9. "github.com/syncthing/syncthing/lib/protocol"
  10. )
  11. // sentFolderFileDownloadState represents a state of what we've announced as available
  12. // to some remote device for a specific file.
  13. type sentFolderFileDownloadState struct {
  14. blockIndexes []int32
  15. version protocol.Vector
  16. updated time.Time
  17. created time.Time
  18. }
  19. // sentFolderDownloadState represents a state of what we've announced as available
  20. // to some remote device for a specific folder.
  21. type sentFolderDownloadState struct {
  22. files map[string]*sentFolderFileDownloadState
  23. }
  24. // update takes a set of currently active sharedPullerStates, and returns a list
  25. // of updates which we need to send to the client to become up to date.
  26. func (s *sentFolderDownloadState) update(pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
  27. var name string
  28. var updates []protocol.FileDownloadProgressUpdate
  29. seen := make(map[string]struct{}, len(pullers))
  30. for _, puller := range pullers {
  31. name = puller.file.Name
  32. seen[name] = struct{}{}
  33. pullerBlockIndexes := puller.Available()
  34. pullerVersion := puller.file.Version
  35. pullerBlockIndexesUpdated := puller.AvailableUpdated()
  36. pullerCreated := puller.created
  37. localFile, ok := s.files[name]
  38. // New file we haven't seen before
  39. if !ok {
  40. // Only send an update if the file actually has some blocks.
  41. if len(pullerBlockIndexes) > 0 {
  42. s.files[name] = &sentFolderFileDownloadState{
  43. blockIndexes: pullerBlockIndexes,
  44. updated: pullerBlockIndexesUpdated,
  45. version: pullerVersion,
  46. created: pullerCreated,
  47. }
  48. updates = append(updates, protocol.FileDownloadProgressUpdate{
  49. Name: name,
  50. Version: pullerVersion,
  51. UpdateType: protocol.UpdateTypeAppend,
  52. BlockIndexes: pullerBlockIndexes,
  53. })
  54. }
  55. continue
  56. }
  57. // Existing file we've already sent an update for.
  58. if pullerBlockIndexesUpdated.Equal(localFile.updated) && pullerVersion.Equal(localFile.version) {
  59. // The file state hasn't changed, go to next.
  60. continue
  61. }
  62. if !pullerVersion.Equal(localFile.version) || !pullerCreated.Equal(localFile.created) {
  63. // The version has changed or the puller was reconstrcuted due to failure.
  64. // Clean up whatever we had for the old file, and advertise the new file.
  65. updates = append(updates, protocol.FileDownloadProgressUpdate{
  66. Name: name,
  67. Version: localFile.version,
  68. UpdateType: protocol.UpdateTypeForget,
  69. })
  70. updates = append(updates, protocol.FileDownloadProgressUpdate{
  71. Name: name,
  72. Version: pullerVersion,
  73. UpdateType: protocol.UpdateTypeAppend,
  74. BlockIndexes: pullerBlockIndexes,
  75. })
  76. localFile.blockIndexes = pullerBlockIndexes
  77. localFile.updated = pullerBlockIndexesUpdated
  78. localFile.version = pullerVersion
  79. localFile.created = pullerCreated
  80. continue
  81. }
  82. // Relies on the fact that sharedPullerState.Available() should always
  83. // append.
  84. newBlocks := pullerBlockIndexes[len(localFile.blockIndexes):]
  85. localFile.blockIndexes = append(localFile.blockIndexes, newBlocks...)
  86. localFile.updated = pullerBlockIndexesUpdated
  87. // If there are new blocks, send the update.
  88. if len(newBlocks) > 0 {
  89. updates = append(updates, protocol.FileDownloadProgressUpdate{
  90. Name: name,
  91. Version: localFile.version,
  92. UpdateType: protocol.UpdateTypeAppend,
  93. BlockIndexes: newBlocks,
  94. })
  95. }
  96. }
  97. // For each file that we are tracking, see if there still is a puller for it
  98. // if not, the file completed or errored out.
  99. for name, info := range s.files {
  100. _, ok := seen[name]
  101. if !ok {
  102. updates = append(updates, protocol.FileDownloadProgressUpdate{
  103. Name: name,
  104. Version: info.version,
  105. UpdateType: protocol.UpdateTypeForget,
  106. })
  107. delete(s.files, name)
  108. }
  109. }
  110. return updates
  111. }
  112. // destroy removes all stored state, and returns a set of updates we need to
  113. // dispatch to clean up the state on the remote end.
  114. func (s *sentFolderDownloadState) destroy() []protocol.FileDownloadProgressUpdate {
  115. updates := make([]protocol.FileDownloadProgressUpdate, 0, len(s.files))
  116. for name, info := range s.files {
  117. updates = append(updates, protocol.FileDownloadProgressUpdate{
  118. Name: name,
  119. Version: info.version,
  120. UpdateType: protocol.UpdateTypeForget,
  121. })
  122. delete(s.files, name)
  123. }
  124. return updates
  125. }
  126. // sentDownloadState represents a state of what we've announced as available
  127. // to some remote device. It is used from within the progress emitter
  128. // which only has one routine, hence is deemed threadsafe.
  129. type sentDownloadState struct {
  130. folderStates map[string]*sentFolderDownloadState
  131. }
  132. // update receives a folder, and a slice of pullers that are currently available
  133. // for the given folder, and according to the state of what we've seen before
  134. // returns a set of updates which we should send to the remote device to make
  135. // it aware of everything that we currently have available.
  136. func (s *sentDownloadState) update(folder string, pullers []*sharedPullerState) []protocol.FileDownloadProgressUpdate {
  137. fs, ok := s.folderStates[folder]
  138. if !ok {
  139. fs = &sentFolderDownloadState{
  140. files: make(map[string]*sentFolderFileDownloadState),
  141. }
  142. s.folderStates[folder] = fs
  143. }
  144. return fs.update(pullers)
  145. }
  146. // folders returns a set of folders this state is currently aware off.
  147. func (s *sentDownloadState) folders() []string {
  148. folders := make([]string, 0, len(s.folderStates))
  149. for key := range s.folderStates {
  150. folders = append(folders, key)
  151. }
  152. return folders
  153. }
  154. // cleanup cleans up all state related to a folder, and returns a set of updates
  155. // which would clean up the state on the remote device.
  156. func (s *sentDownloadState) cleanup(folder string) []protocol.FileDownloadProgressUpdate {
  157. fs, ok := s.folderStates[folder]
  158. if ok {
  159. updates := fs.destroy()
  160. delete(s.folderStates, folder)
  161. return updates
  162. }
  163. return nil
  164. }