indexhandler.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "fmt"
  10. "sync"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/config"
  13. "github.com/syncthing/syncthing/lib/db"
  14. "github.com/syncthing/syncthing/lib/events"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. "github.com/syncthing/syncthing/lib/svcutil"
  17. )
  18. type indexHandler struct {
  19. conn protocol.Connection
  20. downloads *deviceDownloadState
  21. folder string
  22. folderIsReceiveEncrypted bool
  23. prevSequence int64
  24. evLogger events.Logger
  25. cond *sync.Cond
  26. paused bool
  27. fset *db.FileSet
  28. runner service
  29. }
  30. func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) *indexHandler {
  31. myIndexID := fset.IndexID(protocol.LocalDeviceID)
  32. mySequence := fset.Sequence(protocol.LocalDeviceID)
  33. var startSequence int64
  34. // This is the other side's description of what it knows
  35. // about us. Lets check to see if we can start sending index
  36. // updates directly or need to send the index from start...
  37. if startInfo.local.IndexID == myIndexID {
  38. // They say they've seen our index ID before, so we can
  39. // send a delta update only.
  40. if startInfo.local.MaxSequence > mySequence {
  41. // Safety check. They claim to have more or newer
  42. // index data than we have - either we have lost
  43. // index data, or reset the index without resetting
  44. // the IndexID, or something else weird has
  45. // happened. We send a full index to reset the
  46. // situation.
  47. l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.DeviceID().Short(), folder.Description())
  48. startSequence = 0
  49. } else {
  50. l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.DeviceID().Short(), folder.Description(), startInfo.local.MaxSequence)
  51. startSequence = startInfo.local.MaxSequence
  52. }
  53. } else if startInfo.local.IndexID != 0 {
  54. // They say they've seen an index ID from us, but it's
  55. // not the right one. Either they are confused or we
  56. // must have reset our database since last talking to
  57. // them. We'll start with a full index transfer.
  58. l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.DeviceID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID)
  59. startSequence = 0
  60. } else {
  61. l.Debugf("Device %v folder %s has no index ID for us", conn.DeviceID().Short(), folder.Description())
  62. }
  63. // This is the other side's description of themselves. We
  64. // check to see that it matches the IndexID we have on file,
  65. // otherwise we drop our old index data and expect to get a
  66. // completely new set.
  67. theirIndexID := fset.IndexID(conn.DeviceID())
  68. if startInfo.remote.IndexID == 0 {
  69. // They're not announcing an index ID. This means they
  70. // do not support delta indexes and we should clear any
  71. // information we have from them before accepting their
  72. // index, which will presumably be a full index.
  73. l.Debugf("Device %v folder %s does not announce an index ID", conn.DeviceID().Short(), folder.Description())
  74. fset.Drop(conn.DeviceID())
  75. } else if startInfo.remote.IndexID != theirIndexID {
  76. // The index ID we have on file is not what they're
  77. // announcing. They must have reset their database and
  78. // will probably send us a full index. We drop any
  79. // information we have and remember this new index ID
  80. // instead.
  81. l.Infof("Device %v folder %s has a new index ID (%v)", conn.DeviceID().Short(), folder.Description(), startInfo.remote.IndexID)
  82. fset.Drop(conn.DeviceID())
  83. fset.SetIndexID(conn.DeviceID(), startInfo.remote.IndexID)
  84. }
  85. return &indexHandler{
  86. conn: conn,
  87. downloads: downloads,
  88. folder: folder.ID,
  89. folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
  90. prevSequence: startSequence,
  91. evLogger: evLogger,
  92. fset: fset,
  93. runner: runner,
  94. cond: sync.NewCond(new(sync.Mutex)),
  95. }
  96. }
  97. // waitForFileset waits for the handler to resume and fetches the current fileset.
  98. func (s *indexHandler) waitForFileset(ctx context.Context) (*db.FileSet, error) {
  99. s.cond.L.Lock()
  100. defer s.cond.L.Unlock()
  101. for s.paused {
  102. select {
  103. case <-ctx.Done():
  104. return nil, ctx.Err()
  105. default:
  106. s.cond.Wait()
  107. }
  108. }
  109. return s.fset, nil
  110. }
  111. func (s *indexHandler) Serve(ctx context.Context) (err error) {
  112. l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.DeviceID().Short(), s.conn, s.prevSequence)
  113. stop := make(chan struct{})
  114. defer func() {
  115. err = svcutil.NoRestartErr(err)
  116. l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.DeviceID().Short(), s.conn, err)
  117. close(stop)
  118. }()
  119. // Broadcast the pause cond when the context quits
  120. go func() {
  121. select {
  122. case <-ctx.Done():
  123. s.cond.Broadcast()
  124. case <-stop:
  125. }
  126. }()
  127. // We need to send one index, regardless of whether there is something to send or not
  128. fset, err := s.waitForFileset(ctx)
  129. if err != nil {
  130. return err
  131. }
  132. err = s.sendIndexTo(ctx, fset)
  133. // Subscribe to LocalIndexUpdated (we have new information to send) and
  134. // DeviceDisconnected (it might be us who disconnected, so we should
  135. // exit).
  136. sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
  137. defer sub.Unsubscribe()
  138. evChan := sub.C()
  139. ticker := time.NewTicker(time.Minute)
  140. defer ticker.Stop()
  141. for err == nil {
  142. fset, err = s.waitForFileset(ctx)
  143. if err != nil {
  144. return err
  145. }
  146. // While we have sent a sequence at least equal to the one
  147. // currently in the database, wait for the local index to update. The
  148. // local index may update for other folders than the one we are
  149. // sending for.
  150. if fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
  151. select {
  152. case <-ctx.Done():
  153. return ctx.Err()
  154. case <-evChan:
  155. case <-ticker.C:
  156. }
  157. continue
  158. }
  159. err = s.sendIndexTo(ctx, fset)
  160. // Wait a short amount of time before entering the next loop. If there
  161. // are continuous changes happening to the local index, this gives us
  162. // time to batch them up a little.
  163. select {
  164. case <-ctx.Done():
  165. return ctx.Err()
  166. case <-time.After(250 * time.Millisecond):
  167. }
  168. }
  169. return err
  170. }
  171. // resume might be called because the folder was actually resumed, or just
  172. // because the folder config changed (and thus the runner and potentially fset).
  173. func (s *indexHandler) resume(fset *db.FileSet, runner service) {
  174. s.cond.L.Lock()
  175. s.paused = false
  176. s.fset = fset
  177. s.runner = runner
  178. s.cond.Broadcast()
  179. s.cond.L.Unlock()
  180. }
  181. func (s *indexHandler) pause() {
  182. s.cond.L.Lock()
  183. if s.paused {
  184. s.evLogger.Log(events.Failure, "index handler got paused while already paused")
  185. }
  186. s.paused = true
  187. s.fset = nil
  188. s.runner = nil
  189. s.cond.Broadcast()
  190. s.cond.L.Unlock()
  191. }
  192. // sendIndexTo sends file infos with a sequence number higher than prevSequence and
  193. // returns the highest sent sequence number.
  194. func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error {
  195. initial := s.prevSequence == 0
  196. batch := db.NewFileInfoBatch(nil)
  197. batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
  198. l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
  199. if initial {
  200. initial = false
  201. return s.conn.Index(ctx, s.folder, fs)
  202. }
  203. return s.conn.IndexUpdate(ctx, s.folder, fs)
  204. })
  205. var err error
  206. var f protocol.FileInfo
  207. snap, err := fset.Snapshot()
  208. if err != nil {
  209. return svcutil.AsFatalErr(err, svcutil.ExitError)
  210. }
  211. defer snap.Release()
  212. previousWasDelete := false
  213. snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
  214. // This is to make sure that renames (which is an add followed by a delete) land in the same batch.
  215. // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
  216. // the batch ends with a non-delete, or that the last item in the batch is already a delete
  217. if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
  218. if err = batch.Flush(); err != nil {
  219. return false
  220. }
  221. }
  222. if shouldDebug() {
  223. if fi.SequenceNo() < s.prevSequence+1 {
  224. panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
  225. }
  226. }
  227. if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
  228. l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
  229. // Abort this round of index sending - the next one will pick
  230. // up from the last successful one with the repeaired db.
  231. defer func() {
  232. if fixed, dbErr := fset.RepairSequence(); dbErr != nil {
  233. l.Warnln("Failed repairing sequence entries:", dbErr)
  234. panic("Failed repairing sequence entries")
  235. } else {
  236. s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
  237. l.Infof("Repaired %v sequence entries in database", fixed)
  238. }
  239. }()
  240. return false
  241. }
  242. f = fi.(protocol.FileInfo)
  243. // If this is a folder receiving encrypted files only, we
  244. // mustn't ever send locally changed file infos. Those aren't
  245. // encrypted and thus would be a protocol error at the remote.
  246. if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
  247. return true
  248. }
  249. f = prepareFileInfoForIndex(f)
  250. previousWasDelete = f.IsDeleted()
  251. batch.Append(f)
  252. return true
  253. })
  254. if err != nil {
  255. return err
  256. }
  257. err = batch.Flush()
  258. // Use the sequence of the snapshot we iterated as a starting point for the
  259. // next run. Previously we used the sequence of the last file we sent,
  260. // however it's possible that a higher sequence exists, just doesn't need to
  261. // be sent (e.g. in a receive-only folder, when a local change was
  262. // reverted). No point trying to send nothing again.
  263. s.prevSequence = snap.Sequence(protocol.LocalDeviceID)
  264. return err
  265. }
  266. func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string) error {
  267. deviceID := s.conn.DeviceID()
  268. s.cond.L.Lock()
  269. paused := s.paused
  270. fset := s.fset
  271. runner := s.runner
  272. s.cond.L.Unlock()
  273. if paused {
  274. l.Infof("%v for paused folder %q", op, s.folder)
  275. return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused)
  276. }
  277. defer runner.SchedulePull()
  278. s.downloads.Update(s.folder, makeForgetUpdate(fs))
  279. if !update {
  280. fset.Drop(deviceID)
  281. }
  282. for i := range fs {
  283. // The local attributes should never be transmitted over the wire.
  284. // Make sure they look like they weren't.
  285. fs[i].LocalFlags = 0
  286. fs[i].VersionHash = nil
  287. }
  288. fset.Update(deviceID, fs)
  289. seq := fset.Sequence(deviceID)
  290. s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{
  291. "device": deviceID.String(),
  292. "folder": s.folder,
  293. "items": len(fs),
  294. "sequence": seq,
  295. "version": seq, // legacy for sequence
  296. })
  297. return nil
  298. }
  299. func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
  300. // Mark the file as invalid if any of the local bad stuff flags are set.
  301. f.RawInvalid = f.IsInvalid()
  302. // If the file is marked LocalReceive (i.e., changed locally on a
  303. // receive only folder) we do not want it to ever become the
  304. // globally best version, invalid or not.
  305. if f.IsReceiveOnlyChanged() {
  306. f.Version = protocol.Vector{}
  307. }
  308. // The trailer with the encrypted fileinfo is device local, don't send info
  309. // about that to remotes
  310. f.Size -= int64(f.EncryptionTrailerSize)
  311. f.EncryptionTrailerSize = 0
  312. // never sent externally
  313. f.LocalFlags = 0
  314. f.VersionHash = nil
  315. f.InodeChangeNs = 0
  316. return f
  317. }
  318. func (s *indexHandler) String() string {
  319. return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.DeviceID().Short(), s.conn)
  320. }
  321. type indexHandlerRegistry struct {
  322. evLogger events.Logger
  323. conn protocol.Connection
  324. downloads *deviceDownloadState
  325. indexHandlers *serviceMap[string, *indexHandler]
  326. startInfos map[string]*clusterConfigDeviceInfo
  327. folderStates map[string]*indexHandlerFolderState
  328. mut sync.Mutex
  329. }
  330. type indexHandlerFolderState struct {
  331. cfg config.FolderConfiguration
  332. fset *db.FileSet
  333. runner service
  334. }
  335. func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, evLogger events.Logger) *indexHandlerRegistry {
  336. r := &indexHandlerRegistry{
  337. evLogger: evLogger,
  338. conn: conn,
  339. downloads: downloads,
  340. indexHandlers: newServiceMap[string, *indexHandler](evLogger),
  341. startInfos: make(map[string]*clusterConfigDeviceInfo),
  342. folderStates: make(map[string]*indexHandlerFolderState),
  343. mut: sync.Mutex{},
  344. }
  345. return r
  346. }
  347. func (r *indexHandlerRegistry) String() string {
  348. return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short())
  349. }
  350. func (r *indexHandlerRegistry) Serve(ctx context.Context) error {
  351. // Running the index handler registry means running the individual index
  352. // handler children.
  353. return r.indexHandlers.Serve(ctx)
  354. }
  355. func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
  356. r.indexHandlers.RemoveAndWait(folder.ID, 0)
  357. delete(r.startInfos, folder.ID)
  358. is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
  359. r.indexHandlers.Add(folder.ID, is)
  360. // This new connection might help us get in sync.
  361. runner.SchedulePull()
  362. }
  363. // AddIndexInfo starts an index handler for given folder, unless it is paused.
  364. // If it is paused, the given startInfo is stored to start the sender once the
  365. // folder is resumed.
  366. // If an index handler is already running, it will be stopped first.
  367. func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) {
  368. r.mut.Lock()
  369. defer r.mut.Unlock()
  370. if r.indexHandlers.RemoveAndWait(folder, 0) == nil {
  371. l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
  372. }
  373. folderState, ok := r.folderStates[folder]
  374. if !ok {
  375. l.Debugf("Pending index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  376. r.startInfos[folder] = startInfo
  377. return
  378. }
  379. r.startLocked(folderState.cfg, folderState.fset, folderState.runner, startInfo)
  380. }
  381. // Remove stops a running index handler or removes one pending to be started.
  382. // It is a noop if the folder isn't known.
  383. func (r *indexHandlerRegistry) Remove(folder string) {
  384. r.mut.Lock()
  385. defer r.mut.Unlock()
  386. l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  387. r.indexHandlers.RemoveAndWait(folder, 0)
  388. delete(r.startInfos, folder)
  389. l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  390. }
  391. // RemoveAllExcept stops all running index handlers and removes those pending to be started,
  392. // except mentioned ones.
  393. // It is a noop if the folder isn't known.
  394. func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderState) {
  395. r.mut.Lock()
  396. defer r.mut.Unlock()
  397. r.indexHandlers.Each(func(folder string, is *indexHandler) error {
  398. if _, ok := except[folder]; !ok {
  399. r.indexHandlers.RemoveAndWait(folder, 0)
  400. l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
  401. }
  402. return nil
  403. })
  404. for folder := range r.startInfos {
  405. if _, ok := except[folder]; !ok {
  406. delete(r.startInfos, folder)
  407. l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
  408. }
  409. }
  410. }
  411. // RegisterFolderState must be called whenever something about the folder
  412. // changes. The exception being if the folder is removed entirely, then call
  413. // Remove. The fset and runner arguments may be nil, if given folder is paused.
  414. func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
  415. if !folder.SharedWith(r.conn.DeviceID()) {
  416. r.Remove(folder.ID)
  417. return
  418. }
  419. r.mut.Lock()
  420. if folder.Paused {
  421. r.folderPausedLocked(folder.ID)
  422. } else {
  423. r.folderRunningLocked(folder, fset, runner)
  424. }
  425. r.mut.Unlock()
  426. }
  427. // folderPausedLocked stops a running index handler.
  428. // It is a noop if the folder isn't known or has not been started yet.
  429. func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
  430. l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  431. delete(r.folderStates, folder)
  432. if is, ok := r.indexHandlers.Get(folder); ok {
  433. is.pause()
  434. l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  435. } else {
  436. l.Debugf("No index handler for device %v and folder %v to pause", r.conn.DeviceID().Short(), folder)
  437. }
  438. }
  439. // folderRunningLocked resumes an already running index handler or starts it, if it
  440. // was added while paused.
  441. // It is a noop if the folder isn't known.
  442. func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
  443. r.folderStates[folder.ID] = &indexHandlerFolderState{
  444. cfg: folder,
  445. fset: fset,
  446. runner: runner,
  447. }
  448. is, isOk := r.indexHandlers.Get(folder.ID)
  449. if info, ok := r.startInfos[folder.ID]; ok {
  450. if isOk {
  451. r.indexHandlers.RemoveAndWait(folder.ID, 0)
  452. l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
  453. }
  454. r.startLocked(folder, fset, runner, info)
  455. delete(r.startInfos, folder.ID)
  456. l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
  457. } else if isOk {
  458. l.Debugf("Resuming index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  459. is.resume(fset, runner)
  460. } else {
  461. l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.DeviceID().Short(), folder.ID)
  462. }
  463. }
  464. func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
  465. r.mut.Lock()
  466. defer r.mut.Unlock()
  467. is, isOk := r.indexHandlers.Get(folder)
  468. if !isOk {
  469. l.Infof("%v for nonexistent or paused folder %q", op, folder)
  470. return fmt.Errorf("%s: %w", folder, ErrFolderMissing)
  471. }
  472. return is.receive(fs, update, op)
  473. }
  474. // makeForgetUpdate takes an index update and constructs a download progress update
  475. // causing to forget any progress for files which we've just been sent.
  476. func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate {
  477. updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files))
  478. for _, file := range files {
  479. if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() {
  480. continue
  481. }
  482. updates = append(updates, protocol.FileDownloadProgressUpdate{
  483. Name: file.Name,
  484. Version: file.Version,
  485. UpdateType: protocol.FileDownloadProgressUpdateTypeForget,
  486. })
  487. }
  488. return updates
  489. }