folder_recvonly.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "sort"
  9. "time"
  10. "github.com/syncthing/syncthing/lib/config"
  11. "github.com/syncthing/syncthing/lib/db"
  12. "github.com/syncthing/syncthing/lib/fs"
  13. "github.com/syncthing/syncthing/lib/ignore"
  14. "github.com/syncthing/syncthing/lib/protocol"
  15. "github.com/syncthing/syncthing/lib/versioner"
  16. )
  17. func init() {
  18. folderFactories[config.FolderTypeReceiveOnly] = newReceiveOnlyFolder
  19. }
  20. /*
  21. receiveOnlyFolder is a folder that does not propagate local changes outward.
  22. It does this by the following general mechanism (not all of which is
  23. implemted in this file):
  24. - Local changes are scanned and versioned as usual, but get the
  25. FlagLocalReceiveOnly bit set.
  26. - When changes are sent to the cluster this bit gets converted to the
  27. Invalid bit (like all other local flags, currently) and also the Version
  28. gets set to the empty version. The reason for clearing the Version is to
  29. ensure that other devices will not consider themselves out of date due to
  30. our change.
  31. - The database layer accounts sizes per flag bit, so we can know how many
  32. files have been changed locally. We use this to trigger a "Revert" option
  33. on the folder when the amount of locally changed data is nonzero.
  34. - To revert we take the files which have changed and reset their version
  35. counter down to zero. The next pull will replace our changed version with
  36. the globally latest. As this is a user-initiated operation we do not cause
  37. conflict copies when reverting.
  38. - When pulling normally (i.e., not in the revert case) with local changes,
  39. normal conflict resolution will apply. Conflict copies will be created,
  40. but not propagated outwards (because receive only, right).
  41. Implementation wise a receiveOnlyFolder is just a sendReceiveFolder that
  42. sets an extra bit on local changes and has a Revert method.
  43. */
  44. type receiveOnlyFolder struct {
  45. *sendReceiveFolder
  46. }
  47. func newReceiveOnlyFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem) service {
  48. sr := newSendReceiveFolder(model, cfg, ver, fs).(*sendReceiveFolder)
  49. sr.localFlags = protocol.FlagLocalReceiveOnly // gets propagated to the scanner, and set on locally changed files
  50. return &receiveOnlyFolder{sr}
  51. }
  52. func (f *receiveOnlyFolder) Revert(fs *db.FileSet, updateFn func([]protocol.FileInfo)) {
  53. f.setState(FolderScanning)
  54. defer f.setState(FolderIdle)
  55. // XXX: This *really* should be given to us in the constructor...
  56. f.model.fmut.RLock()
  57. ignores := f.model.folderIgnores[f.folderID]
  58. f.model.fmut.RUnlock()
  59. scanChan := make(chan string)
  60. go f.pullScannerRoutine(scanChan)
  61. defer close(scanChan)
  62. delQueue := &deleteQueue{
  63. handler: f, // for the deleteFile and deleteDir methods
  64. ignores: ignores,
  65. scanChan: scanChan,
  66. }
  67. batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
  68. batchSizeBytes := 0
  69. fs.WithHave(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
  70. fi := intf.(protocol.FileInfo)
  71. if !fi.IsReceiveOnlyChanged() {
  72. // We're only interested in files that have changed locally in
  73. // receive only mode.
  74. return true
  75. }
  76. if len(fi.Version.Counters) == 1 && fi.Version.Counters[0].ID == f.shortID {
  77. // We are the only device mentioned in the version vector so the
  78. // file must originate here. A revert then means to delete it.
  79. // We'll delete files directly, directories get queued and
  80. // handled below.
  81. handled, err := delQueue.handle(fi)
  82. if err != nil {
  83. l.Infof("Revert: deleting %s: %v\n", fi.Name, err)
  84. return true // continue
  85. }
  86. if !handled {
  87. return true // continue
  88. }
  89. fi = protocol.FileInfo{
  90. Name: fi.Name,
  91. Type: fi.Type,
  92. ModifiedS: fi.ModifiedS,
  93. ModifiedNs: fi.ModifiedNs,
  94. ModifiedBy: f.shortID,
  95. Deleted: true,
  96. Version: protocol.Vector{}, // if this file ever resurfaces anywhere we want our delete to be strictly older
  97. }
  98. } else {
  99. // Revert means to throw away our local changes. We reset the
  100. // version to the empty vector, which is strictly older than any
  101. // other existing version. It is not in conflict with anything,
  102. // either, so we will not create a conflict copy of our local
  103. // changes.
  104. fi.Version = protocol.Vector{}
  105. fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
  106. }
  107. batch = append(batch, fi)
  108. batchSizeBytes += fi.ProtoSize()
  109. if len(batch) >= maxBatchSizeFiles || batchSizeBytes >= maxBatchSizeBytes {
  110. updateFn(batch)
  111. batch = batch[:0]
  112. batchSizeBytes = 0
  113. }
  114. return true
  115. })
  116. if len(batch) > 0 {
  117. updateFn(batch)
  118. }
  119. batch = batch[:0]
  120. batchSizeBytes = 0
  121. // Handle any queued directories
  122. deleted, err := delQueue.flush()
  123. if err != nil {
  124. l.Infoln("Revert:", err)
  125. }
  126. now := time.Now()
  127. for _, dir := range deleted {
  128. batch = append(batch, protocol.FileInfo{
  129. Name: dir,
  130. Type: protocol.FileInfoTypeDirectory,
  131. ModifiedS: now.Unix(),
  132. ModifiedBy: f.shortID,
  133. Deleted: true,
  134. Version: protocol.Vector{},
  135. })
  136. }
  137. if len(batch) > 0 {
  138. updateFn(batch)
  139. }
  140. // We will likely have changed our local index, but that won't trigger a
  141. // pull by itself. Make sure we schedule one so that we start
  142. // downloading files.
  143. f.SchedulePull()
  144. }
  145. // deleteQueue handles deletes by delegating to a handler and queuing
  146. // directories for last.
  147. type deleteQueue struct {
  148. handler interface {
  149. deleteFile(file protocol.FileInfo, scanChan chan<- string) (dbUpdateJob, error)
  150. deleteDir(dir string, ignores *ignore.Matcher, scanChan chan<- string) error
  151. }
  152. ignores *ignore.Matcher
  153. dirs []string
  154. scanChan chan<- string
  155. }
  156. func (q *deleteQueue) handle(fi protocol.FileInfo) (bool, error) {
  157. // Things that are ignored but not marked deletable are not processed.
  158. ign := q.ignores.Match(fi.Name)
  159. if ign.IsIgnored() && !ign.IsDeletable() {
  160. return false, nil
  161. }
  162. // Directories are queued for later processing.
  163. if fi.IsDirectory() {
  164. q.dirs = append(q.dirs, fi.Name)
  165. return false, nil
  166. }
  167. // Kill it.
  168. _, err := q.handler.deleteFile(fi, q.scanChan)
  169. return true, err
  170. }
  171. func (q *deleteQueue) flush() ([]string, error) {
  172. // Process directories from the leaves inward.
  173. sort.Sort(sort.Reverse(sort.StringSlice(q.dirs)))
  174. var firstError error
  175. var deleted []string
  176. for _, dir := range q.dirs {
  177. if err := q.handler.deleteDir(dir, q.ignores, q.scanChan); err == nil {
  178. deleted = append(deleted, dir)
  179. } else if err != nil && firstError == nil {
  180. firstError = err
  181. }
  182. }
  183. return deleted, firstError
  184. }