indexhandler.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "errors"
  10. "fmt"
  11. "sync"
  12. "time"
  13. "github.com/syncthing/syncthing/internal/db"
  14. "github.com/syncthing/syncthing/internal/itererr"
  15. "github.com/syncthing/syncthing/lib/config"
  16. "github.com/syncthing/syncthing/lib/events"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. "github.com/syncthing/syncthing/lib/svcutil"
  19. "github.com/syncthing/syncthing/lib/ur"
  20. )
  21. type indexHandler struct {
  22. conn protocol.Connection
  23. downloads *deviceDownloadState
  24. folder string
  25. folderIsReceiveEncrypted bool
  26. evLogger events.Logger
  27. // We track the latest / highest sequence number in two ways for two
  28. // different reasons. Initially they are the same -- the highest seen
  29. // sequence number reported by the other side (or zero).
  30. //
  31. // One is the highest number we've seen when iterating the database,
  32. // which we track for database iteration purposes. When we loop, we
  33. // start looking at that number plus one in the next loop. Our index
  34. // numbering may have holes which this will skip over.
  35. //
  36. // The other is the highest sequence we previously sent to the other
  37. // side, used by them for correctness checks. This one must not skip
  38. // holes. That is, if we iterate and find a hole, this is not
  39. // incremented because nothing was sent to the other side.
  40. localPrevSequence int64 // the highest sequence number we've seen in our FileInfos
  41. sentPrevSequence int64 // the highest sequence number we've sent to the peer
  42. cond *sync.Cond
  43. paused bool
  44. sdb db.DB
  45. runner service
  46. }
  47. func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, sdb db.DB, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) (*indexHandler, error) {
  48. myIndexID, err := sdb.GetIndexID(folder.ID, protocol.LocalDeviceID)
  49. if err != nil {
  50. return nil, err
  51. }
  52. mySequence, err := sdb.GetDeviceSequence(folder.ID, protocol.LocalDeviceID)
  53. if err != nil {
  54. return nil, err
  55. }
  56. var startSequence int64
  57. // This is the other side's description of what it knows
  58. // about us. Lets check to see if we can start sending index
  59. // updates directly or need to send the index from start...
  60. if startInfo.local.IndexID == myIndexID {
  61. // They say they've seen our index ID before, so we can
  62. // send a delta update only.
  63. if startInfo.local.MaxSequence > mySequence {
  64. // Safety check. They claim to have more or newer
  65. // index data than we have - either we have lost
  66. // index data, or reset the index without resetting
  67. // the IndexID, or something else weird has
  68. // happened. We send a full index to reset the
  69. // situation.
  70. l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.DeviceID().Short(), folder.Description())
  71. startSequence = 0
  72. } else {
  73. l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.DeviceID().Short(), folder.Description(), startInfo.local.MaxSequence)
  74. startSequence = startInfo.local.MaxSequence
  75. }
  76. } else if startInfo.local.IndexID != 0 {
  77. // They say they've seen an index ID from us, but it's
  78. // not the right one. Either they are confused or we
  79. // must have reset our database since last talking to
  80. // them. We'll start with a full index transfer.
  81. l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.DeviceID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID)
  82. startSequence = 0
  83. } else {
  84. l.Debugf("Device %v folder %s has no index ID for us", conn.DeviceID().Short(), folder.Description())
  85. }
  86. // This is the other side's description of themselves. We
  87. // check to see that it matches the IndexID we have on file,
  88. // otherwise we drop our old index data and expect to get a
  89. // completely new set.
  90. theirIndexID, _ := sdb.GetIndexID(folder.ID, conn.DeviceID())
  91. if startInfo.remote.IndexID == 0 {
  92. // They're not announcing an index ID. This means they
  93. // do not support delta indexes and we should clear any
  94. // information we have from them before accepting their
  95. // index, which will presumably be a full index.
  96. l.Debugf("Device %v folder %s does not announce an index ID", conn.DeviceID().Short(), folder.Description())
  97. if err := sdb.DropAllFiles(folder.ID, conn.DeviceID()); err != nil {
  98. return nil, err
  99. }
  100. } else if startInfo.remote.IndexID != theirIndexID {
  101. // The index ID we have on file is not what they're
  102. // announcing. They must have reset their database and
  103. // will probably send us a full index. We drop any
  104. // information we have and remember this new index ID
  105. // instead.
  106. l.Infof("Device %v folder %s has a new index ID (%v)", conn.DeviceID().Short(), folder.Description(), startInfo.remote.IndexID)
  107. if err := sdb.DropAllFiles(folder.ID, conn.DeviceID()); err != nil {
  108. return nil, err
  109. }
  110. if err := sdb.SetIndexID(folder.ID, conn.DeviceID(), startInfo.remote.IndexID); err != nil {
  111. return nil, err
  112. }
  113. }
  114. return &indexHandler{
  115. conn: conn,
  116. downloads: downloads,
  117. folder: folder.ID,
  118. folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
  119. localPrevSequence: startSequence,
  120. sentPrevSequence: startSequence,
  121. evLogger: evLogger,
  122. sdb: sdb,
  123. runner: runner,
  124. cond: sync.NewCond(new(sync.Mutex)),
  125. }, nil
  126. }
  127. // waitWhilePaused waits for the handler to resume.
  128. func (s *indexHandler) waitWhilePaused(ctx context.Context) error {
  129. s.cond.L.Lock()
  130. defer s.cond.L.Unlock()
  131. for s.paused {
  132. select {
  133. case <-ctx.Done():
  134. return ctx.Err()
  135. default:
  136. s.cond.Wait()
  137. }
  138. }
  139. return nil
  140. }
  141. func (s *indexHandler) Serve(ctx context.Context) (err error) {
  142. l.Debugf("Starting index handler for %s to %s at %s (localPrevSequence=%d)", s.folder, s.conn.DeviceID().Short(), s.conn, s.localPrevSequence)
  143. stop := make(chan struct{})
  144. defer func() {
  145. err = svcutil.NoRestartErr(err)
  146. l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.DeviceID().Short(), s.conn, err)
  147. close(stop)
  148. }()
  149. // Broadcast the pause cond when the context quits
  150. go func() {
  151. select {
  152. case <-ctx.Done():
  153. s.cond.Broadcast()
  154. case <-stop:
  155. }
  156. }()
  157. // We need to send one index, regardless of whether there is something to send or not
  158. if err := s.waitWhilePaused(ctx); err != nil {
  159. return err
  160. }
  161. err = s.sendIndexTo(ctx)
  162. // Subscribe to LocalIndexUpdated (we have new information to send) and
  163. // DeviceDisconnected (it might be us who disconnected, so we should
  164. // exit).
  165. sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
  166. defer sub.Unsubscribe()
  167. evChan := sub.C()
  168. ticker := time.NewTicker(time.Minute)
  169. defer ticker.Stop()
  170. for err == nil {
  171. if err := s.waitWhilePaused(ctx); err != nil {
  172. return err
  173. }
  174. // While we have sent a sequence at least equal to the one
  175. // currently in the database, wait for the local index to update. The
  176. // local index may update for other folders than the one we are
  177. // sending for.
  178. var seq int64
  179. seq, err = s.sdb.GetDeviceSequence(s.folder, protocol.LocalDeviceID)
  180. if err != nil {
  181. return err
  182. }
  183. if seq <= s.localPrevSequence {
  184. select {
  185. case <-ctx.Done():
  186. return ctx.Err()
  187. case <-evChan:
  188. case <-ticker.C:
  189. }
  190. continue
  191. }
  192. err = s.sendIndexTo(ctx)
  193. // Wait a short amount of time before entering the next loop. If there
  194. // are continuous changes happening to the local index, this gives us
  195. // time to batch them up a little.
  196. select {
  197. case <-ctx.Done():
  198. return ctx.Err()
  199. case <-time.After(250 * time.Millisecond):
  200. }
  201. }
  202. return err
  203. }
  204. // resume might be called because the folder was actually resumed, or just
  205. // because the folder config changed (and thus the runner and potentially fset).
  206. func (s *indexHandler) resume(runner service) {
  207. s.cond.L.Lock()
  208. s.paused = false
  209. s.runner = runner
  210. s.cond.Broadcast()
  211. s.cond.L.Unlock()
  212. }
  213. func (s *indexHandler) pause() {
  214. s.cond.L.Lock()
  215. if s.paused {
  216. s.evLogger.Log(events.Failure, "index handler got paused while already paused")
  217. }
  218. s.paused = true
  219. s.runner = nil
  220. s.cond.Broadcast()
  221. s.cond.L.Unlock()
  222. }
  223. // sendIndexTo sends file infos with a sequence number higher than prevSequence and
  224. // returns the highest sent sequence number.
  225. func (s *indexHandler) sendIndexTo(ctx context.Context) error {
  226. initial := s.localPrevSequence == 0
  227. batch := NewFileInfoBatch(nil)
  228. var batchError error
  229. batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
  230. select {
  231. case <-ctx.Done():
  232. return ctx.Err()
  233. default:
  234. }
  235. if len(fs) == 0 {
  236. // can't happen, flush is not called with an empty batch
  237. panic("bug: flush called with empty batch (race condition?)")
  238. }
  239. if batchError != nil {
  240. // can't happen, once an error is returned the index sender exits
  241. panic(fmt.Sprintf("bug: once failed it should stay failed (%v)", batchError))
  242. }
  243. l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
  244. lastSequence := fs[len(fs)-1].Sequence
  245. var err error
  246. if initial {
  247. initial = false
  248. err = s.conn.Index(ctx, &protocol.Index{
  249. Folder: s.folder,
  250. Files: fs,
  251. LastSequence: lastSequence,
  252. })
  253. } else {
  254. err = s.conn.IndexUpdate(ctx, &protocol.IndexUpdate{
  255. Folder: s.folder,
  256. Files: fs,
  257. PrevSequence: s.sentPrevSequence,
  258. LastSequence: lastSequence,
  259. })
  260. }
  261. if err != nil {
  262. batchError = err
  263. return err
  264. }
  265. s.sentPrevSequence = lastSequence
  266. return nil
  267. })
  268. var f protocol.FileInfo
  269. previousWasDelete := false
  270. for fi, err := range itererr.Zip(s.sdb.AllLocalFilesBySequence(s.folder, protocol.LocalDeviceID, s.localPrevSequence+1, MaxBatchSizeFiles+1)) {
  271. if err != nil {
  272. return err
  273. }
  274. // This is to make sure that renames (which is an add followed by a delete) land in the same batch.
  275. // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
  276. // the batch ends with a non-delete, or that the last item in the batch is already a delete
  277. if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
  278. break
  279. }
  280. if fi.SequenceNo() < s.localPrevSequence+1 {
  281. s.logSequenceAnomaly("database returned sequence lower than requested", map[string]any{
  282. "sequence": fi.SequenceNo(),
  283. "start": s.localPrevSequence + 1,
  284. })
  285. return errors.New("database misbehaved")
  286. }
  287. if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
  288. s.logSequenceAnomaly("database returned non-increasing sequence", map[string]any{
  289. "sequence": fi.SequenceNo(),
  290. "start": s.localPrevSequence + 1,
  291. "previous": f.Sequence,
  292. })
  293. return errors.New("database misbehaved")
  294. }
  295. f = fi
  296. s.localPrevSequence = f.Sequence
  297. // If this is a folder receiving encrypted files only, we
  298. // mustn't ever send locally changed file infos. Those aren't
  299. // encrypted and thus would be a protocol error at the remote.
  300. if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
  301. continue
  302. }
  303. f = prepareFileInfoForIndex(f)
  304. previousWasDelete = f.IsDeleted()
  305. batch.Append(f)
  306. }
  307. return batch.Flush()
  308. }
  309. func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string, prevSequence, lastSequence int64) error {
  310. deviceID := s.conn.DeviceID()
  311. s.cond.L.Lock()
  312. paused := s.paused
  313. runner := s.runner
  314. s.cond.L.Unlock()
  315. if paused {
  316. l.Infof("%v for paused folder %q", op, s.folder)
  317. return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused)
  318. }
  319. defer runner.SchedulePull()
  320. s.downloads.Update(s.folder, makeForgetUpdate(fs))
  321. if !update {
  322. if err := s.sdb.DropAllFiles(s.folder, deviceID); err != nil {
  323. return err
  324. }
  325. }
  326. l.Debugf("Received %d files for %s from %s, prevSeq=%d, lastSeq=%d", len(fs), s.folder, deviceID.Short(), prevSequence, lastSequence)
  327. // Verify that the previous sequence number matches what we expected
  328. exp, err := s.sdb.GetDeviceSequence(s.folder, deviceID)
  329. if err != nil {
  330. return err
  331. }
  332. if prevSequence > 0 && prevSequence != exp {
  333. s.logSequenceAnomaly("index update with unexpected sequence", map[string]any{
  334. "prevSeq": prevSequence,
  335. "lastSeq": lastSequence,
  336. "batch": len(fs),
  337. "expectedPrev": exp,
  338. })
  339. }
  340. for i := range fs {
  341. // Verify index in relation to the claimed sequence boundaries
  342. if fs[i].Sequence < prevSequence {
  343. s.logSequenceAnomaly("file with sequence before prevSequence", map[string]any{
  344. "prevSeq": prevSequence,
  345. "lastSeq": lastSequence,
  346. "batch": len(fs),
  347. "seenSeq": fs[i].Sequence,
  348. "atIndex": i,
  349. })
  350. }
  351. if lastSequence > 0 && fs[i].Sequence > lastSequence {
  352. s.logSequenceAnomaly("file with sequence after lastSequence", map[string]any{
  353. "prevSeq": prevSequence,
  354. "lastSeq": lastSequence,
  355. "batch": len(fs),
  356. "seenSeq": fs[i].Sequence,
  357. "atIndex": i,
  358. })
  359. }
  360. if i > 0 && fs[i].Sequence <= fs[i-1].Sequence {
  361. s.logSequenceAnomaly("index update with non-increasing sequence", map[string]any{
  362. "prevSeq": prevSequence,
  363. "lastSeq": lastSequence,
  364. "batch": len(fs),
  365. "seenSeq": fs[i].Sequence,
  366. "atIndex": i,
  367. "precedingSeq": fs[i-1].Sequence,
  368. })
  369. }
  370. // The local attributes should never be transmitted over the wire.
  371. // Make sure they look like they weren't.
  372. fs[i].LocalFlags = 0
  373. fs[i].VersionHash = nil
  374. }
  375. // Verify the claimed last sequence number
  376. if lastSequence > 0 && len(fs) > 0 && lastSequence != fs[len(fs)-1].Sequence {
  377. s.logSequenceAnomaly("index update with unexpected last sequence", map[string]any{
  378. "prevSeq": prevSequence,
  379. "lastSeq": lastSequence,
  380. "batch": len(fs),
  381. "seenSeq": fs[len(fs)-1].Sequence,
  382. })
  383. }
  384. if err := s.sdb.Update(s.folder, deviceID, fs); err != nil {
  385. return err
  386. }
  387. seq, err := s.sdb.GetDeviceSequence(s.folder, deviceID)
  388. if err != nil {
  389. return err
  390. }
  391. // Check that the sequence we get back is what we put in...
  392. if lastSequence > 0 && len(fs) > 0 && seq != lastSequence {
  393. s.logSequenceAnomaly("unexpected sequence after update", map[string]any{
  394. "prevSeq": prevSequence,
  395. "lastSeq": lastSequence,
  396. "batch": len(fs),
  397. "seenSeq": fs[len(fs)-1].Sequence,
  398. "returnedSeq": seq,
  399. })
  400. }
  401. s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{
  402. "device": deviceID.String(),
  403. "folder": s.folder,
  404. "items": len(fs),
  405. "sequence": seq,
  406. "version": seq, // legacy for sequence
  407. })
  408. return nil
  409. }
  410. func (s *indexHandler) logSequenceAnomaly(msg string, extra map[string]any) {
  411. extraStrs := make(map[string]string, len(extra))
  412. for k, v := range extra {
  413. extraStrs[k] = fmt.Sprint(v)
  414. }
  415. s.evLogger.Log(events.Failure, ur.FailureData{
  416. Description: msg,
  417. Extra: extraStrs,
  418. })
  419. }
  420. func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
  421. // Mark the file as invalid if any of the local bad stuff flags are set.
  422. f.RawInvalid = f.IsInvalid()
  423. // If the file is marked LocalReceive (i.e., changed locally on a
  424. // receive only folder) we do not want it to ever become the
  425. // globally best version, invalid or not.
  426. if f.IsReceiveOnlyChanged() {
  427. f.Version = protocol.Vector{}
  428. }
  429. // The trailer with the encrypted fileinfo is device local, don't send info
  430. // about that to remotes
  431. f.Size -= int64(f.EncryptionTrailerSize)
  432. f.EncryptionTrailerSize = 0
  433. // never sent externally
  434. f.LocalFlags = 0
  435. f.VersionHash = nil
  436. f.InodeChangeNs = 0
  437. return f
  438. }
  439. func (s *indexHandler) String() string {
  440. return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.DeviceID().Short(), s.conn)
  441. }
  442. type indexHandlerRegistry struct {
  443. evLogger events.Logger
  444. conn protocol.Connection
  445. sdb db.DB
  446. downloads *deviceDownloadState
  447. indexHandlers *serviceMap[string, *indexHandler]
  448. startInfos map[string]*clusterConfigDeviceInfo
  449. folderStates map[string]*indexHandlerFolderState
  450. mut sync.Mutex
  451. }
  452. type indexHandlerFolderState struct {
  453. cfg config.FolderConfiguration
  454. runner service
  455. }
  456. func newIndexHandlerRegistry(conn protocol.Connection, sdb db.DB, downloads *deviceDownloadState, evLogger events.Logger) *indexHandlerRegistry {
  457. r := &indexHandlerRegistry{
  458. evLogger: evLogger,
  459. conn: conn,
  460. sdb: sdb,
  461. downloads: downloads,
  462. indexHandlers: newServiceMap[string, *indexHandler](evLogger),
  463. startInfos: make(map[string]*clusterConfigDeviceInfo),
  464. folderStates: make(map[string]*indexHandlerFolderState),
  465. mut: sync.Mutex{},
  466. }
  467. return r
  468. }
  469. func (r *indexHandlerRegistry) String() string {
  470. return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short())
  471. }
  472. func (r *indexHandlerRegistry) Serve(ctx context.Context) error {
  473. // Running the index handler registry means running the individual index
  474. // handler children.
  475. return r.indexHandlers.Serve(ctx)
  476. }
  477. func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, runner service, startInfo *clusterConfigDeviceInfo) error {
  478. r.indexHandlers.RemoveAndWait(folder.ID, 0)
  479. delete(r.startInfos, folder.ID)
  480. is, err := newIndexHandler(r.conn, r.downloads, folder, r.sdb, runner, startInfo, r.evLogger)
  481. if err != nil {
  482. return err
  483. }
  484. r.indexHandlers.Add(folder.ID, is)
  485. // This new connection might help us get in sync.
  486. runner.SchedulePull()
  487. return nil
  488. }
  489. // AddIndexInfo starts an index handler for given folder, unless it is paused.
  490. // If it is paused, the given startInfo is stored to start the sender once the
  491. // folder is resumed.
  492. // If an index handler is already running, it will be stopped first.
  493. func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) {
  494. r.mut.Lock()
  495. defer r.mut.Unlock()
  496. if r.indexHandlers.RemoveAndWait(folder, 0) == nil {
  497. l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
  498. }
  499. folderState, ok := r.folderStates[folder]
  500. if !ok {
  501. l.Debugf("Pending index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  502. r.startInfos[folder] = startInfo
  503. return
  504. }
  505. _ = r.startLocked(folderState.cfg, folderState.runner, startInfo) // XXX error handling...
  506. }
  507. // Remove stops a running index handler or removes one pending to be started.
  508. // It is a noop if the folder isn't known.
  509. func (r *indexHandlerRegistry) Remove(folder string) {
  510. r.mut.Lock()
  511. defer r.mut.Unlock()
  512. l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  513. r.indexHandlers.RemoveAndWait(folder, 0)
  514. delete(r.startInfos, folder)
  515. l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  516. }
  517. // RemoveAllExcept stops all running index handlers and removes those pending to be started,
  518. // except mentioned ones.
  519. // It is a noop if the folder isn't known.
  520. func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderState) {
  521. r.mut.Lock()
  522. defer r.mut.Unlock()
  523. r.indexHandlers.Each(func(folder string, is *indexHandler) error {
  524. if _, ok := except[folder]; !ok {
  525. r.indexHandlers.RemoveAndWait(folder, 0)
  526. l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
  527. }
  528. return nil
  529. })
  530. for folder := range r.startInfos {
  531. if _, ok := except[folder]; !ok {
  532. delete(r.startInfos, folder)
  533. l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
  534. }
  535. }
  536. }
  537. // RegisterFolderState must be called whenever something about the folder
  538. // changes. The exception being if the folder is removed entirely, then call
  539. // Remove. The fset and runner arguments may be nil, if given folder is paused.
  540. func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, runner service) {
  541. if !folder.SharedWith(r.conn.DeviceID()) {
  542. r.Remove(folder.ID)
  543. return
  544. }
  545. r.mut.Lock()
  546. if folder.Paused {
  547. r.folderPausedLocked(folder.ID)
  548. } else {
  549. r.folderRunningLocked(folder, runner)
  550. }
  551. r.mut.Unlock()
  552. }
  553. // folderPausedLocked stops a running index handler.
  554. // It is a noop if the folder isn't known or has not been started yet.
  555. func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
  556. l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  557. delete(r.folderStates, folder)
  558. if is, ok := r.indexHandlers.Get(folder); ok {
  559. is.pause()
  560. l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  561. } else {
  562. l.Debugf("No index handler for device %v and folder %v to pause", r.conn.DeviceID().Short(), folder)
  563. }
  564. }
  565. // folderRunningLocked resumes an already running index handler or starts it, if it
  566. // was added while paused.
  567. // It is a noop if the folder isn't known.
  568. func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfiguration, runner service) {
  569. r.folderStates[folder.ID] = &indexHandlerFolderState{
  570. cfg: folder,
  571. runner: runner,
  572. }
  573. is, isOk := r.indexHandlers.Get(folder.ID)
  574. if info, ok := r.startInfos[folder.ID]; ok {
  575. if isOk {
  576. r.indexHandlers.RemoveAndWait(folder.ID, 0)
  577. l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
  578. }
  579. _ = r.startLocked(folder, runner, info) // XXX error handling...
  580. delete(r.startInfos, folder.ID)
  581. l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
  582. } else if isOk {
  583. l.Debugf("Resuming index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
  584. is.resume(runner)
  585. } else {
  586. l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.DeviceID().Short(), folder.ID)
  587. }
  588. }
  589. func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string, prevSequence, lastSequence int64) error {
  590. r.mut.Lock()
  591. defer r.mut.Unlock()
  592. is, isOk := r.indexHandlers.Get(folder)
  593. if !isOk {
  594. l.Infof("%v for nonexistent or paused folder %q", op, folder)
  595. return fmt.Errorf("%s: %w", folder, ErrFolderMissing)
  596. }
  597. return is.receive(fs, update, op, prevSequence, lastSequence)
  598. }
  599. // makeForgetUpdate takes an index update and constructs a download progress update
  600. // causing to forget any progress for files which we've just been sent.
  601. func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate {
  602. updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files))
  603. for _, file := range files {
  604. if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() {
  605. continue
  606. }
  607. updates = append(updates, protocol.FileDownloadProgressUpdate{
  608. Name: file.Name,
  609. Version: file.Version,
  610. UpdateType: protocol.FileDownloadProgressUpdateTypeForget,
  611. })
  612. }
  613. return updates
  614. }