indexhandler.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "fmt"
  10. "sync"
  11. "time"
  12. "github.com/syncthing/syncthing/lib/config"
  13. "github.com/syncthing/syncthing/lib/db"
  14. "github.com/syncthing/syncthing/lib/events"
  15. "github.com/syncthing/syncthing/lib/protocol"
  16. "github.com/syncthing/syncthing/lib/svcutil"
  17. "github.com/syncthing/syncthing/lib/ur"
  18. )
  19. type indexHandler struct {
  20. conn protocol.Connection
  21. downloads *deviceDownloadState
  22. folder string
  23. folderIsReceiveEncrypted bool
  24. evLogger events.Logger
  25. // We track the latest / highest sequence number in two ways for two
  26. // different reasons. Initially they are the same -- the highest seen
  27. // sequence number reported by the other side (or zero).
  28. //
  29. // One is the highest number we've seen when iterating the database,
  30. // which we track for database iteration purposes. When we loop, we
  31. // start looking at that number plus one in the next loop. Our index
  32. // numbering may have holes which this will skip over.
  33. //
  34. // The other is the highest sequence we previously sent to the other
  35. // side, used by them for correctness checks. This one must not skip
  36. // holes. That is, if we iterate and find a hole, this is not
  37. // incremented because nothing was sent to the other side.
  38. localPrevSequence int64 // the highest sequence number we've seen in our FileInfos
  39. sentPrevSequence int64 // the highest sequence number we've sent to the peer
  40. cond *sync.Cond
  41. paused bool
  42. fset *db.FileSet
  43. runner service
  44. }
  45. func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) *indexHandler {
  46. myIndexID := fset.IndexID(protocol.LocalDeviceID)
  47. mySequence := fset.Sequence(protocol.LocalDeviceID)
  48. var startSequence int64
  49. // This is the other side's description of what it knows
  50. // about us. Lets check to see if we can start sending index
  51. // updates directly or need to send the index from start...
  52. if startInfo.local.IndexID == myIndexID {
  53. // They say they've seen our index ID before, so we can
  54. // send a delta update only.
  55. if startInfo.local.MaxSequence > mySequence {
  56. // Safety check. They claim to have more or newer
  57. // index data than we have - either we have lost
  58. // index data, or reset the index without resetting
  59. // the IndexID, or something else weird has
  60. // happened. We send a full index to reset the
  61. // situation.
  62. l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.DeviceID().Short(), folder.Description())
  63. startSequence = 0
  64. } else {
  65. l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.DeviceID().Short(), folder.Description(), startInfo.local.MaxSequence)
  66. startSequence = startInfo.local.MaxSequence
  67. }
  68. } else if startInfo.local.IndexID != 0 {
  69. // They say they've seen an index ID from us, but it's
  70. // not the right one. Either they are confused or we
  71. // must have reset our database since last talking to
  72. // them. We'll start with a full index transfer.
  73. l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.DeviceID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID)
  74. startSequence = 0
  75. } else {
  76. l.Debugf("Device %v folder %s has no index ID for us", conn.DeviceID().Short(), folder.Description())
  77. }
  78. // This is the other side's description of themselves. We
  79. // check to see that it matches the IndexID we have on file,
  80. // otherwise we drop our old index data and expect to get a
  81. // completely new set.
  82. theirIndexID := fset.IndexID(conn.DeviceID())
  83. if startInfo.remote.IndexID == 0 {
  84. // They're not announcing an index ID. This means they
  85. // do not support delta indexes and we should clear any
  86. // information we have from them before accepting their
  87. // index, which will presumably be a full index.
  88. l.Debugf("Device %v folder %s does not announce an index ID", conn.DeviceID().Short(), folder.Description())
  89. fset.Drop(conn.DeviceID())
  90. } else if startInfo.remote.IndexID != theirIndexID {
  91. // The index ID we have on file is not what they're
  92. // announcing. They must have reset their database and
  93. // will probably send us a full index. We drop any
  94. // information we have and remember this new index ID
  95. // instead.
  96. l.Infof("Device %v folder %s has a new index ID (%v)", conn.DeviceID().Short(), folder.Description(), startInfo.remote.IndexID)
  97. fset.Drop(conn.DeviceID())
  98. fset.SetIndexID(conn.DeviceID(), startInfo.remote.IndexID)
  99. }
  100. return &indexHandler{
  101. conn: conn,
  102. downloads: downloads,
  103. folder: folder.ID,
  104. folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
  105. localPrevSequence: startSequence,
  106. sentPrevSequence: startSequence,
  107. evLogger: evLogger,
  108. fset: fset,
  109. runner: runner,
  110. cond: sync.NewCond(new(sync.Mutex)),
  111. }
  112. }
  113. // waitForFileset waits for the handler to resume and fetches the current fileset.
  114. func (s *indexHandler) waitForFileset(ctx context.Context) (*db.FileSet, error) {
  115. s.cond.L.Lock()
  116. defer s.cond.L.Unlock()
  117. for s.paused {
  118. select {
  119. case <-ctx.Done():
  120. return nil, ctx.Err()
  121. default:
  122. s.cond.Wait()
  123. }
  124. }
  125. return s.fset, nil
  126. }
  127. func (s *indexHandler) Serve(ctx context.Context) (err error) {
  128. l.Debugf("Starting index handler for %s to %s at %s (localPrevSequence=%d)", s.folder, s.conn.DeviceID().Short(), s.conn, s.localPrevSequence)
  129. stop := make(chan struct{})
  130. defer func() {
  131. err = svcutil.NoRestartErr(err)
  132. l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.DeviceID().Short(), s.conn, err)
  133. close(stop)
  134. }()
  135. // Broadcast the pause cond when the context quits
  136. go func() {
  137. select {
  138. case <-ctx.Done():
  139. s.cond.Broadcast()
  140. case <-stop:
  141. }
  142. }()
  143. // We need to send one index, regardless of whether there is something to send or not
  144. fset, err := s.waitForFileset(ctx)
  145. if err != nil {
  146. return err
  147. }
  148. err = s.sendIndexTo(ctx, fset)
  149. // Subscribe to LocalIndexUpdated (we have new information to send) and
  150. // DeviceDisconnected (it might be us who disconnected, so we should
  151. // exit).
  152. sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
  153. defer sub.Unsubscribe()
  154. evChan := sub.C()
  155. ticker := time.NewTicker(time.Minute)
  156. defer ticker.Stop()
  157. for err == nil {
  158. fset, err = s.waitForFileset(ctx)
  159. if err != nil {
  160. return err
  161. }
  162. // While we have sent a sequence at least equal to the one
  163. // currently in the database, wait for the local index to update. The
  164. // local index may update for other folders than the one we are
  165. // sending for.
  166. if fset.Sequence(protocol.LocalDeviceID) <= s.localPrevSequence {
  167. select {
  168. case <-ctx.Done():
  169. return ctx.Err()
  170. case <-evChan:
  171. case <-ticker.C:
  172. }
  173. continue
  174. }
  175. err = s.sendIndexTo(ctx, fset)
  176. // Wait a short amount of time before entering the next loop. If there
  177. // are continuous changes happening to the local index, this gives us
  178. // time to batch them up a little.
  179. select {
  180. case <-ctx.Done():
  181. return ctx.Err()
  182. case <-time.After(250 * time.Millisecond):
  183. }
  184. }
  185. return err
  186. }
  187. // resume might be called because the folder was actually resumed, or just
  188. // because the folder config changed (and thus the runner and potentially fset).
  189. func (s *indexHandler) resume(fset *db.FileSet, runner service) {
  190. s.cond.L.Lock()
  191. s.paused = false
  192. s.fset = fset
  193. s.runner = runner
  194. s.cond.Broadcast()
  195. s.cond.L.Unlock()
  196. }
  197. func (s *indexHandler) pause() {
  198. s.cond.L.Lock()
  199. if s.paused {
  200. s.evLogger.Log(events.Failure, "index handler got paused while already paused")
  201. }
  202. s.paused = true
  203. s.fset = nil
  204. s.runner = nil
  205. s.cond.Broadcast()
  206. s.cond.L.Unlock()
  207. }
  208. // sendIndexTo sends file infos with a sequence number higher than prevSequence and
  209. // returns the highest sent sequence number.
  210. func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error {
  211. initial := s.localPrevSequence == 0
  212. batch := db.NewFileInfoBatch(nil)
  213. var batchError error
  214. batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
  215. select {
  216. case <-ctx.Done():
  217. return ctx.Err()
  218. default:
  219. }
  220. if len(fs) == 0 {
  221. // can't happen, flush is not called with an empty batch
  222. panic("bug: flush called with empty batch (race condition?)")
  223. }
  224. if batchError != nil {
  225. // can't happen, once an error is returned the index sender exits
  226. panic(fmt.Sprintf("bug: once failed it should stay failed (%v)", batchError))
  227. }
  228. l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
  229. lastSequence := fs[len(fs)-1].Sequence
  230. var err error
  231. if initial {
  232. initial = false
  233. err = s.conn.Index(ctx, &protocol.Index{
  234. Folder: s.folder,
  235. Files: fs,
  236. LastSequence: lastSequence,
  237. })
  238. } else {
  239. err = s.conn.IndexUpdate(ctx, &protocol.IndexUpdate{
  240. Folder: s.folder,
  241. Files: fs,
  242. PrevSequence: s.sentPrevSequence,
  243. LastSequence: lastSequence,
  244. })
  245. }
  246. if err != nil {
  247. batchError = err
  248. return err
  249. }
  250. s.sentPrevSequence = lastSequence
  251. return nil
  252. })
  253. var err error
  254. var f protocol.FileInfo
  255. snap, err := fset.Snapshot()
  256. if err != nil {
  257. return svcutil.AsFatalErr(err, svcutil.ExitError)
  258. }
  259. defer snap.Release()
  260. previousWasDelete := false
  261. snap.WithHaveSequence(s.localPrevSequence+1, func(fi protocol.FileIntf) bool {
  262. // This is to make sure that renames (which is an add followed by a delete) land in the same batch.
  263. // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
  264. // the batch ends with a non-delete, or that the last item in the batch is already a delete
  265. if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
  266. if err = batch.Flush(); err != nil {
  267. return false
  268. }
  269. }
  270. if fi.SequenceNo() < s.localPrevSequence+1 {
  271. s.logSequenceAnomaly("database returned sequence lower than requested", map[string]any{
  272. "sequence": fi.SequenceNo(),
  273. "start": s.localPrevSequence + 1,
  274. })
  275. }
  276. if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
  277. s.logSequenceAnomaly("database returned non-increasing sequence", map[string]any{
  278. "sequence": fi.SequenceNo(),
  279. "start": s.localPrevSequence + 1,
  280. "previous": f.Sequence,
  281. })
  282. // Abort this round of index sending - the next one will pick
  283. // up from the last successful one with the repeaired db.
  284. defer func() {
  285. if fixed, dbErr := fset.RepairSequence(); dbErr != nil {
  286. l.Warnln("Failed repairing sequence entries:", dbErr)
  287. panic("Failed repairing sequence entries")
  288. } else {
  289. s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
  290. l.Infof("Repaired %v sequence entries in database", fixed)
  291. }
  292. }()
  293. return false
  294. }
  295. f = fi.(protocol.FileInfo)
  296. // If this is a folder receiving encrypted files only, we
  297. // mustn't ever send locally changed file infos. Those aren't
  298. // encrypted and thus would be a protocol error at the remote.
  299. if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
  300. return true
  301. }
  302. f = prepareFileInfoForIndex(f)
  303. previousWasDelete = f.IsDeleted()
  304. batch.Append(f)
  305. return true
  306. })
  307. if err != nil {
  308. return err
  309. }
  310. if err := batch.Flush(); err != nil {
  311. return err
  312. }
  313. // Use the sequence of the snapshot we iterated as a starting point for the
  314. // next run. Previously we used the sequence of the last file we sent,
  315. // however it's possible that a higher sequence exists, just doesn't need to
  316. // be sent (e.g. in a receive-only folder, when a local change was
  317. // reverted). No point trying to send nothing again.
  318. s.localPrevSequence = snap.Sequence(protocol.LocalDeviceID)
  319. return nil
  320. }
  321. func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string, prevSequence, lastSequence int64) error {
  322. deviceID := s.conn.DeviceID()
  323. s.cond.L.Lock()
  324. paused := s.paused
  325. fset := s.fset
  326. runner := s.runner
  327. s.cond.L.Unlock()
  328. if paused {
  329. l.Infof("%v for paused folder %q", op, s.folder)
  330. return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused)
  331. }
  332. defer runner.SchedulePull()
  333. s.downloads.Update(s.folder, makeForgetUpdate(fs))
  334. if !update {
  335. fset.Drop(deviceID)
  336. }
  337. l.Debugf("Received %d files for %s from %s, prevSeq=%d, lastSeq=%d", len(fs), s.folder, deviceID.Short(), prevSequence, lastSequence)
  338. // Verify that the previous sequence number matches what we expected
  339. if exp := fset.Sequence(deviceID); prevSequence > 0 && prevSequence != exp {
  340. s.logSequenceAnomaly("index update with unexpected sequence", map[string]any{
  341. "prevSeq": prevSequence,
  342. "lastSeq": lastSequence,
  343. "batch": len(fs),
  344. "expectedPrev": exp,
  345. })
  346. }
  347. for i := range fs {
  348. // Verify index in relation to the claimed sequence boundaries
  349. if fs[i].Sequence < prevSequence {
  350. s.logSequenceAnomaly("file with sequence before prevSequence", map[string]any{
  351. "prevSeq": prevSequence,
  352. "lastSeq": lastSequence,
  353. "batch": len(fs),
  354. "seenSeq": fs[i].Sequence,
  355. "atIndex": i,
  356. })
  357. }
  358. if lastSequence > 0 && fs[i].Sequence > lastSequence {
  359. s.logSequenceAnomaly("file with sequence after lastSequence", map[string]any{
  360. "prevSeq": prevSequence,
  361. "lastSeq": lastSequence,
  362. "batch": len(fs),
  363. "seenSeq": fs[i].Sequence,
  364. "atIndex": i,
  365. })
  366. }
  367. if i > 0 && fs[i].Sequence <= fs[i-1].Sequence {
  368. s.logSequenceAnomaly("index update with non-increasing sequence", map[string]any{
  369. "prevSeq": prevSequence,
  370. "lastSeq": lastSequence,
  371. "batch": len(fs),
  372. "seenSeq": fs[i].Sequence,
  373. "atIndex": i,
  374. "precedingSeq": fs[i-1].Sequence,
  375. })
  376. }
  377. // The local attributes should never be transmitted over the wire.
  378. // Make sure they look like they weren't.
  379. fs[i].LocalFlags = 0
  380. fs[i].VersionHash = nil
  381. }
  382. // Verify the claimed last sequence number
  383. if lastSequence > 0 && len(fs) > 0 && lastSequence != fs[len(fs)-1].Sequence {
  384. s.logSequenceAnomaly("index update with unexpected last sequence", map[string]any{
  385. "prevSeq": prevSequence,
  386. "lastSeq": lastSequence,
  387. "batch": len(fs),
  388. "seenSeq": fs[len(fs)-1].Sequence,
  389. })
  390. }
  391. fset.Update(deviceID, fs)
  392. seq := fset.Sequence(deviceID)
  393. // Check that the sequence we get back is what we put in...
  394. if lastSequence > 0 && seq != lastSequence {
  395. s.logSequenceAnomaly("unexpected sequence after update", map[string]any{
  396. "prevSeq": prevSequence,
  397. "lastSeq": lastSequence,
  398. "batch": len(fs),
  399. "seenSeq": fs[len(fs)-1].Sequence,
  400. "returnedSeq": seq,
  401. })
  402. }
  403. s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{
  404. "device": deviceID.String(),
  405. "folder": s.folder,
  406. "items": len(fs),
  407. "sequence": seq,
  408. "version": seq, // legacy for sequence
  409. })
  410. return nil
  411. }
  412. func (s *indexHandler) logSequenceAnomaly(msg string, extra map[string]any) {
  413. extraStrs := make(map[string]string, len(extra))
  414. for k, v := range extra {
  415. extraStrs[k] = fmt.Sprint(v)
  416. }
  417. s.evLogger.Log(events.Failure, ur.FailureData{
  418. Description: msg,
  419. Extra: extraStrs,
  420. })
  421. }
  422. func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
  423. // Mark the file as invalid if any of the local bad stuff flags are set.
  424. f.RawInvalid = f.IsInvalid()
  425. // If the file is marked LocalReceive (i.e., changed locally on a
  426. // receive only folder) we do not want it to ever become the
  427. // globally best version, invalid or not.
  428. if f.IsReceiveOnlyChanged() {
  429. f.Version = protocol.Vector{}
  430. }
  431. // The trailer with the encrypted fileinfo is device local, don't send info
  432. // about that to remotes
  433. f.Size -= int64(f.EncryptionTrailerSize)
  434. f.EncryptionTrailerSize = 0
  435. // never sent externally
  436. f.LocalFlags = 0
  437. f.VersionHash = nil
  438. f.InodeChangeNs = 0
  439. return f
  440. }
  441. func (s *indexHandler) String() string {
  442. return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.DeviceID().Short(), s.conn)
  443. }
  444. type indexHandlerRegistry struct {
  445. evLogger events.Logger
  446. conn protocol.Connection
  447. downloads *deviceDownloadState
  448. indexHandlers *serviceMap[string, *indexHandler]
  449. startInfos map[string]*clusterConfigDeviceInfo
  450. folderStates map[string]*indexHandlerFolderState
  451. mut sync.Mutex
  452. }
  453. type indexHandlerFolderState struct {
  454. cfg config.FolderConfiguration
  455. fset *db.FileSet
  456. runner service
  457. }
  458. func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, evLogger events.Logger) *indexHandlerRegistry {
  459. r := &indexHandlerRegistry{
  460. evLogger: evLogger,
  461. conn: conn,
  462. downloads: downloads,
  463. indexHandlers: newServiceMap[string, *indexHandler](evLogger),
  464. startInfos: make(map[string]*clusterConfigDeviceInfo),
  465. folderStates: make(map[string]*indexHandlerFolderState),
  466. mut: sync.Mutex{},
  467. }
  468. return r
  469. }
  470. func (r *indexHandlerRegistry) String() string {
  471. return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short())
  472. }
  473. func (r *indexHandlerRegistry) Serve(ctx context.Context) error {
  474. // Running the index handler registry means running the individual index
  475. // handler children.
  476. return r.indexHandlers.Serve(ctx)
  477. }
  478. func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
  479. r.indexHandlers.RemoveAndWait(folder.ID, 0)
  480. delete(r.startInfos, folder.ID)
  481. is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
  482. r.indexHandlers.Add(folder.ID, is)
  483. // This new connection might help us get in sync.
  484. runner.SchedulePull()
  485. }
  486. // AddIndexInfo starts an index handler for given folder, unless it is paused.
  487. // If it is paused, the given startInfo is stored to start the sender once the
  488. // folder is resumed.
  489. // If an index handler is already running, it will be stopped first.
  490. func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) {
  491. r.mut.Lock()
  492. defer r.mut.Unlock()
  493. if r.indexHandlers.RemoveAndWait(folder, 0) == nil {
  494. l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
  495. }
  496. folderState, ok := r.folderStates[folder]
  497. if !ok {
  498. l.Debugf("Pending index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  499. r.startInfos[folder] = startInfo
  500. return
  501. }
  502. r.startLocked(folderState.cfg, folderState.fset, folderState.runner, startInfo)
  503. }
  504. // Remove stops a running index handler or removes one pending to be started.
  505. // It is a noop if the folder isn't known.
  506. func (r *indexHandlerRegistry) Remove(folder string) {
  507. r.mut.Lock()
  508. defer r.mut.Unlock()
  509. l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  510. r.indexHandlers.RemoveAndWait(folder, 0)
  511. delete(r.startInfos, folder)
  512. l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  513. }
  514. // RemoveAllExcept stops all running index handlers and removes those pending to be started,
  515. // except mentioned ones.
  516. // It is a noop if the folder isn't known.
  517. func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderState) {
  518. r.mut.Lock()
  519. defer r.mut.Unlock()
  520. r.indexHandlers.Each(func(folder string, is *indexHandler) error {
  521. if _, ok := except[folder]; !ok {
  522. r.indexHandlers.RemoveAndWait(folder, 0)
  523. l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
  524. }
  525. return nil
  526. })
  527. for folder := range r.startInfos {
  528. if _, ok := except[folder]; !ok {
  529. delete(r.startInfos, folder)
  530. l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
  531. }
  532. }
  533. }
  534. // RegisterFolderState must be called whenever something about the folder
  535. // changes. The exception being if the folder is removed entirely, then call
  536. // Remove. The fset and runner arguments may be nil, if given folder is paused.
  537. func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
  538. if !folder.SharedWith(r.conn.DeviceID()) {
  539. r.Remove(folder.ID)
  540. return
  541. }
  542. r.mut.Lock()
  543. if folder.Paused {
  544. r.folderPausedLocked(folder.ID)
  545. } else {
  546. r.folderRunningLocked(folder, fset, runner)
  547. }
  548. r.mut.Unlock()
  549. }
  550. // folderPausedLocked stops a running index handler.
  551. // It is a noop if the folder isn't known or has not been started yet.
  552. func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
  553. l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  554. delete(r.folderStates, folder)
  555. if is, ok := r.indexHandlers.Get(folder); ok {
  556. is.pause()
  557. l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  558. } else {
  559. l.Debugf("No index handler for device %v and folder %v to pause", r.conn.DeviceID().Short(), folder)
  560. }
  561. }
  562. // folderRunningLocked resumes an already running index handler or starts it, if it
  563. // was added while paused.
  564. // It is a noop if the folder isn't known.
  565. func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
  566. r.folderStates[folder.ID] = &indexHandlerFolderState{
  567. cfg: folder,
  568. fset: fset,
  569. runner: runner,
  570. }
  571. is, isOk := r.indexHandlers.Get(folder.ID)
  572. if info, ok := r.startInfos[folder.ID]; ok {
  573. if isOk {
  574. r.indexHandlers.RemoveAndWait(folder.ID, 0)
  575. l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
  576. }
  577. r.startLocked(folder, fset, runner, info)
  578. delete(r.startInfos, folder.ID)
  579. l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
  580. } else if isOk {
  581. l.Debugf("Resuming index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  582. is.resume(fset, runner)
  583. } else {
  584. l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.DeviceID().Short(), folder.ID)
  585. }
  586. }
  587. func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string, prevSequence, lastSequence int64) error {
  588. r.mut.Lock()
  589. defer r.mut.Unlock()
  590. is, isOk := r.indexHandlers.Get(folder)
  591. if !isOk {
  592. l.Infof("%v for nonexistent or paused folder %q", op, folder)
  593. return fmt.Errorf("%s: %w", folder, ErrFolderMissing)
  594. }
  595. return is.receive(fs, update, op, prevSequence, lastSequence)
  596. }
  597. // makeForgetUpdate takes an index update and constructs a download progress update
  598. // causing to forget any progress for files which we've just been sent.
  599. func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate {
  600. updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files))
  601. for _, file := range files {
  602. if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() {
  603. continue
  604. }
  605. updates = append(updates, protocol.FileDownloadProgressUpdate{
  606. Name: file.Name,
  607. Version: file.Version,
  608. UpdateType: protocol.FileDownloadProgressUpdateTypeForget,
  609. })
  610. }
  611. return updates
  612. }