indexsender.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "fmt"
  10. "sync"
  11. "time"
  12. "github.com/thejerf/suture/v4"
  13. "github.com/syncthing/syncthing/lib/config"
  14. "github.com/syncthing/syncthing/lib/db"
  15. "github.com/syncthing/syncthing/lib/events"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "github.com/syncthing/syncthing/lib/svcutil"
  18. )
  19. type indexSender struct {
  20. conn protocol.Connection
  21. folder string
  22. folderIsReceiveEncrypted bool
  23. fset *db.FileSet
  24. prevSequence int64
  25. evLogger events.Logger
  26. connClosed chan struct{}
  27. done chan struct{}
  28. token suture.ServiceToken
  29. pauseChan chan struct{}
  30. resumeChan chan *db.FileSet
  31. }
  32. func (s *indexSender) Serve(ctx context.Context) (err error) {
  33. l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
  34. defer func() {
  35. close(s.done)
  36. err = svcutil.NoRestartErr(err)
  37. l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
  38. }()
  39. // We need to send one index, regardless of whether there is something to send or not
  40. err = s.sendIndexTo(ctx)
  41. // Subscribe to LocalIndexUpdated (we have new information to send) and
  42. // DeviceDisconnected (it might be us who disconnected, so we should
  43. // exit).
  44. sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
  45. defer sub.Unsubscribe()
  46. paused := false
  47. evChan := sub.C()
  48. ticker := time.NewTicker(time.Minute)
  49. defer ticker.Stop()
  50. for err == nil {
  51. select {
  52. case <-ctx.Done():
  53. return ctx.Err()
  54. case <-s.connClosed:
  55. return nil
  56. default:
  57. }
  58. // While we have sent a sequence at least equal to the one
  59. // currently in the database, wait for the local index to update. The
  60. // local index may update for other folders than the one we are
  61. // sending for.
  62. if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
  63. select {
  64. case <-ctx.Done():
  65. return ctx.Err()
  66. case <-s.connClosed:
  67. return nil
  68. case <-evChan:
  69. case <-ticker.C:
  70. case <-s.pauseChan:
  71. paused = true
  72. case s.fset = <-s.resumeChan:
  73. paused = false
  74. }
  75. continue
  76. }
  77. if !paused {
  78. err = s.sendIndexTo(ctx)
  79. }
  80. // Wait a short amount of time before entering the next loop. If there
  81. // are continuous changes happening to the local index, this gives us
  82. // time to batch them up a little.
  83. time.Sleep(250 * time.Millisecond)
  84. }
  85. return err
  86. }
  87. func (s *indexSender) resume(fset *db.FileSet) {
  88. select {
  89. case <-s.done:
  90. case s.resumeChan <- fset:
  91. }
  92. }
  93. func (s *indexSender) pause() {
  94. select {
  95. case <-s.done:
  96. case s.pauseChan <- struct{}{}:
  97. }
  98. }
  99. // sendIndexTo sends file infos with a sequence number higher than prevSequence and
  100. // returns the highest sent sequence number.
  101. func (s *indexSender) sendIndexTo(ctx context.Context) error {
  102. initial := s.prevSequence == 0
  103. batch := db.NewFileInfoBatch(nil)
  104. batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
  105. l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
  106. if initial {
  107. initial = false
  108. return s.conn.Index(ctx, s.folder, fs)
  109. }
  110. return s.conn.IndexUpdate(ctx, s.folder, fs)
  111. })
  112. var err error
  113. var f protocol.FileInfo
  114. snap, err := s.fset.Snapshot()
  115. if err != nil {
  116. return svcutil.AsFatalErr(err, svcutil.ExitError)
  117. }
  118. defer snap.Release()
  119. previousWasDelete := false
  120. snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
  121. // This is to make sure that renames (which is an add followed by a delete) land in the same batch.
  122. // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
  123. // the batch ends with a non-delete, or that the last item in the batch is already a delete
  124. if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
  125. if err = batch.Flush(); err != nil {
  126. return false
  127. }
  128. }
  129. if shouldDebug() {
  130. if fi.SequenceNo() < s.prevSequence+1 {
  131. panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
  132. }
  133. }
  134. if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
  135. l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
  136. // Abort this round of index sending - the next one will pick
  137. // up from the last successful one with the repeaired db.
  138. defer func() {
  139. if fixed, dbErr := s.fset.RepairSequence(); dbErr != nil {
  140. l.Warnln("Failed repairing sequence entries:", dbErr)
  141. panic("Failed repairing sequence entries")
  142. } else {
  143. s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
  144. l.Infof("Repaired %v sequence entries in database", fixed)
  145. }
  146. }()
  147. return false
  148. }
  149. f = fi.(protocol.FileInfo)
  150. // If this is a folder receiving encrypted files only, we
  151. // mustn't ever send locally changed file infos. Those aren't
  152. // encrypted and thus would be a protocol error at the remote.
  153. if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
  154. return true
  155. }
  156. f = prepareFileInfoForIndex(f)
  157. previousWasDelete = f.IsDeleted()
  158. batch.Append(f)
  159. return true
  160. })
  161. if err != nil {
  162. return err
  163. }
  164. err = batch.Flush()
  165. // True if there was nothing to be sent
  166. if f.Sequence == 0 {
  167. return err
  168. }
  169. s.prevSequence = f.Sequence
  170. return err
  171. }
  172. func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
  173. // Mark the file as invalid if any of the local bad stuff flags are set.
  174. f.RawInvalid = f.IsInvalid()
  175. // If the file is marked LocalReceive (i.e., changed locally on a
  176. // receive only folder) we do not want it to ever become the
  177. // globally best version, invalid or not.
  178. if f.IsReceiveOnlyChanged() {
  179. f.Version = protocol.Vector{}
  180. }
  181. // never sent externally
  182. f.LocalFlags = 0
  183. f.VersionHash = nil
  184. return f
  185. }
  186. func (s *indexSender) String() string {
  187. return fmt.Sprintf("indexSender@%p for %s to %s at %s", s, s.folder, s.conn.ID(), s.conn)
  188. }
  189. type indexSenderRegistry struct {
  190. deviceID protocol.DeviceID
  191. sup *suture.Supervisor
  192. evLogger events.Logger
  193. conn protocol.Connection
  194. closed chan struct{}
  195. indexSenders map[string]*indexSender
  196. startInfos map[string]*indexSenderStartInfo
  197. mut sync.Mutex
  198. }
  199. func newIndexSenderRegistry(conn protocol.Connection, closed chan struct{}, sup *suture.Supervisor, evLogger events.Logger) *indexSenderRegistry {
  200. return &indexSenderRegistry{
  201. deviceID: conn.ID(),
  202. conn: conn,
  203. closed: closed,
  204. sup: sup,
  205. evLogger: evLogger,
  206. indexSenders: make(map[string]*indexSender),
  207. startInfos: make(map[string]*indexSenderStartInfo),
  208. mut: sync.Mutex{},
  209. }
  210. }
  211. // add starts an index sender for given folder.
  212. // If an index sender is already running, it will be stopped first.
  213. func (r *indexSenderRegistry) add(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
  214. r.mut.Lock()
  215. r.addLocked(folder, fset, startInfo)
  216. l.Debugf("Started index sender for device %v and folder %v", r.deviceID.Short(), folder.ID)
  217. r.mut.Unlock()
  218. }
  219. func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset *db.FileSet, startInfo *indexSenderStartInfo) {
  220. myIndexID := fset.IndexID(protocol.LocalDeviceID)
  221. mySequence := fset.Sequence(protocol.LocalDeviceID)
  222. var startSequence int64
  223. // This is the other side's description of what it knows
  224. // about us. Lets check to see if we can start sending index
  225. // updates directly or need to send the index from start...
  226. if startInfo.local.IndexID == myIndexID {
  227. // They say they've seen our index ID before, so we can
  228. // send a delta update only.
  229. if startInfo.local.MaxSequence > mySequence {
  230. // Safety check. They claim to have more or newer
  231. // index data than we have - either we have lost
  232. // index data, or reset the index without resetting
  233. // the IndexID, or something else weird has
  234. // happened. We send a full index to reset the
  235. // situation.
  236. l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", r.deviceID, folder.Description())
  237. startSequence = 0
  238. } else {
  239. l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", r.deviceID, folder.Description(), startInfo.local.MaxSequence)
  240. startSequence = startInfo.local.MaxSequence
  241. }
  242. } else if startInfo.local.IndexID != 0 {
  243. // They say they've seen an index ID from us, but it's
  244. // not the right one. Either they are confused or we
  245. // must have reset our database since last talking to
  246. // them. We'll start with a full index transfer.
  247. l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", r.deviceID, folder.Description(), startInfo.local.IndexID, myIndexID)
  248. startSequence = 0
  249. } else {
  250. l.Debugf("Device %v folder %s has no index ID for us", r.deviceID, folder.Description())
  251. }
  252. // This is the other side's description of themselves. We
  253. // check to see that it matches the IndexID we have on file,
  254. // otherwise we drop our old index data and expect to get a
  255. // completely new set.
  256. theirIndexID := fset.IndexID(r.deviceID)
  257. if startInfo.remote.IndexID == 0 {
  258. // They're not announcing an index ID. This means they
  259. // do not support delta indexes and we should clear any
  260. // information we have from them before accepting their
  261. // index, which will presumably be a full index.
  262. l.Debugf("Device %v folder %s does not announce an index ID", r.deviceID, folder.Description())
  263. fset.Drop(r.deviceID)
  264. } else if startInfo.remote.IndexID != theirIndexID {
  265. // The index ID we have on file is not what they're
  266. // announcing. They must have reset their database and
  267. // will probably send us a full index. We drop any
  268. // information we have and remember this new index ID
  269. // instead.
  270. l.Infof("Device %v folder %s has a new index ID (%v)", r.deviceID, folder.Description(), startInfo.remote.IndexID)
  271. fset.Drop(r.deviceID)
  272. fset.SetIndexID(r.deviceID, startInfo.remote.IndexID)
  273. }
  274. if is, ok := r.indexSenders[folder.ID]; ok {
  275. r.sup.RemoveAndWait(is.token, 0)
  276. delete(r.indexSenders, folder.ID)
  277. }
  278. delete(r.startInfos, folder.ID)
  279. is := &indexSender{
  280. conn: r.conn,
  281. connClosed: r.closed,
  282. done: make(chan struct{}),
  283. folder: folder.ID,
  284. folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
  285. fset: fset,
  286. prevSequence: startSequence,
  287. evLogger: r.evLogger,
  288. pauseChan: make(chan struct{}),
  289. resumeChan: make(chan *db.FileSet),
  290. }
  291. is.token = r.sup.Add(is)
  292. r.indexSenders[folder.ID] = is
  293. }
  294. // addPending stores the given info to start an index sender once resume is called
  295. // for this folder.
  296. // If an index sender is already running, it will be stopped.
  297. func (r *indexSenderRegistry) addPending(folder string, startInfo *indexSenderStartInfo) {
  298. r.mut.Lock()
  299. defer r.mut.Unlock()
  300. if is, ok := r.indexSenders[folder]; ok {
  301. r.sup.RemoveAndWait(is.token, 0)
  302. delete(r.indexSenders, folder)
  303. l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.deviceID.Short(), folder)
  304. }
  305. r.startInfos[folder] = startInfo
  306. l.Debugf("Pending index sender for device %v and folder %v", r.deviceID.Short(), folder)
  307. }
  308. // remove stops a running index sender or removes one pending to be started.
  309. // It is a noop if the folder isn't known.
  310. func (r *indexSenderRegistry) remove(folder string) {
  311. r.mut.Lock()
  312. defer r.mut.Unlock()
  313. if is, ok := r.indexSenders[folder]; ok {
  314. r.sup.RemoveAndWait(is.token, 0)
  315. delete(r.indexSenders, folder)
  316. }
  317. delete(r.startInfos, folder)
  318. l.Debugf("Removed index sender for device %v and folder %v", r.deviceID.Short(), folder)
  319. }
  320. // removeAllExcept stops all running index senders and removes those pending to be started,
  321. // except mentioned ones.
  322. // It is a noop if the folder isn't known.
  323. func (r *indexSenderRegistry) removeAllExcept(except map[string]struct{}) {
  324. r.mut.Lock()
  325. defer r.mut.Unlock()
  326. for folder, is := range r.indexSenders {
  327. if _, ok := except[folder]; !ok {
  328. r.sup.RemoveAndWait(is.token, 0)
  329. delete(r.indexSenders, folder)
  330. l.Debugf("Removed index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder)
  331. }
  332. }
  333. for folder := range r.startInfos {
  334. if _, ok := except[folder]; !ok {
  335. delete(r.startInfos, folder)
  336. l.Debugf("Removed pending index sender for device %v and folder %v (removeAllExcept)", r.deviceID.Short(), folder)
  337. }
  338. }
  339. }
  340. // pause stops a running index sender.
  341. // It is a noop if the folder isn't known or has not been started yet.
  342. func (r *indexSenderRegistry) pause(folder string) {
  343. r.mut.Lock()
  344. defer r.mut.Unlock()
  345. if is, ok := r.indexSenders[folder]; ok {
  346. is.pause()
  347. l.Debugf("Paused index sender for device %v and folder %v", r.deviceID.Short(), folder)
  348. } else {
  349. l.Debugf("No index sender for device %v and folder %v to pause", r.deviceID.Short(), folder)
  350. }
  351. }
  352. // resume unpauses an already running index sender or starts it, if it was added
  353. // while paused.
  354. // It is a noop if the folder isn't known.
  355. func (r *indexSenderRegistry) resume(folder config.FolderConfiguration, fset *db.FileSet) {
  356. r.mut.Lock()
  357. defer r.mut.Unlock()
  358. is, isOk := r.indexSenders[folder.ID]
  359. if info, ok := r.startInfos[folder.ID]; ok {
  360. if isOk {
  361. r.sup.RemoveAndWait(is.token, 0)
  362. delete(r.indexSenders, folder.ID)
  363. l.Debugf("Removed index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID)
  364. }
  365. r.addLocked(folder, fset, info)
  366. delete(r.startInfos, folder.ID)
  367. l.Debugf("Started index sender for device %v and folder %v in resume", r.deviceID.Short(), folder.ID)
  368. } else if isOk {
  369. is.resume(fset)
  370. l.Debugf("Resume index sender for device %v and folder %v", r.deviceID.Short(), folder.ID)
  371. } else {
  372. l.Debugf("Not resuming index sender for device %v and folder %v as none is paused and there is no start info", r.deviceID.Short(), folder.ID)
  373. }
  374. }
  375. type indexSenderStartInfo struct {
  376. local, remote protocol.Device
  377. }