indexhandler.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. // Copyright (C) 2020 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "context"
  9. "fmt"
  10. "sync"
  11. "time"
  12. "github.com/thejerf/suture/v4"
  13. "github.com/syncthing/syncthing/lib/config"
  14. "github.com/syncthing/syncthing/lib/db"
  15. "github.com/syncthing/syncthing/lib/events"
  16. "github.com/syncthing/syncthing/lib/protocol"
  17. "github.com/syncthing/syncthing/lib/svcutil"
  18. )
  19. type indexHandler struct {
  20. conn protocol.Connection
  21. downloads *deviceDownloadState
  22. folder string
  23. folderIsReceiveEncrypted bool
  24. prevSequence int64
  25. evLogger events.Logger
  26. token suture.ServiceToken
  27. cond *sync.Cond
  28. paused bool
  29. fset *db.FileSet
  30. runner service
  31. }
  32. func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo, evLogger events.Logger) *indexHandler {
  33. myIndexID := fset.IndexID(protocol.LocalDeviceID)
  34. mySequence := fset.Sequence(protocol.LocalDeviceID)
  35. var startSequence int64
  36. // This is the other side's description of what it knows
  37. // about us. Lets check to see if we can start sending index
  38. // updates directly or need to send the index from start...
  39. if startInfo.local.IndexID == myIndexID {
  40. // They say they've seen our index ID before, so we can
  41. // send a delta update only.
  42. if startInfo.local.MaxSequence > mySequence {
  43. // Safety check. They claim to have more or newer
  44. // index data than we have - either we have lost
  45. // index data, or reset the index without resetting
  46. // the IndexID, or something else weird has
  47. // happened. We send a full index to reset the
  48. // situation.
  49. l.Infof("Device %v folder %s is delta index compatible, but seems out of sync with reality", conn.ID().Short(), folder.Description())
  50. startSequence = 0
  51. } else {
  52. l.Debugf("Device %v folder %s is delta index compatible (mlv=%d)", conn.ID().Short(), folder.Description(), startInfo.local.MaxSequence)
  53. startSequence = startInfo.local.MaxSequence
  54. }
  55. } else if startInfo.local.IndexID != 0 {
  56. // They say they've seen an index ID from us, but it's
  57. // not the right one. Either they are confused or we
  58. // must have reset our database since last talking to
  59. // them. We'll start with a full index transfer.
  60. l.Infof("Device %v folder %s has mismatching index ID for us (%v != %v)", conn.ID().Short(), folder.Description(), startInfo.local.IndexID, myIndexID)
  61. startSequence = 0
  62. } else {
  63. l.Debugf("Device %v folder %s has no index ID for us", conn.ID().Short(), folder.Description())
  64. }
  65. // This is the other side's description of themselves. We
  66. // check to see that it matches the IndexID we have on file,
  67. // otherwise we drop our old index data and expect to get a
  68. // completely new set.
  69. theirIndexID := fset.IndexID(conn.ID())
  70. if startInfo.remote.IndexID == 0 {
  71. // They're not announcing an index ID. This means they
  72. // do not support delta indexes and we should clear any
  73. // information we have from them before accepting their
  74. // index, which will presumably be a full index.
  75. l.Debugf("Device %v folder %s does not announce an index ID", conn.ID().Short(), folder.Description())
  76. fset.Drop(conn.ID())
  77. } else if startInfo.remote.IndexID != theirIndexID {
  78. // The index ID we have on file is not what they're
  79. // announcing. They must have reset their database and
  80. // will probably send us a full index. We drop any
  81. // information we have and remember this new index ID
  82. // instead.
  83. l.Infof("Device %v folder %s has a new index ID (%v)", conn.ID().Short(), folder.Description(), startInfo.remote.IndexID)
  84. fset.Drop(conn.ID())
  85. fset.SetIndexID(conn.ID(), startInfo.remote.IndexID)
  86. }
  87. return &indexHandler{
  88. conn: conn,
  89. downloads: downloads,
  90. folder: folder.ID,
  91. folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
  92. prevSequence: startSequence,
  93. evLogger: evLogger,
  94. fset: fset,
  95. runner: runner,
  96. cond: sync.NewCond(new(sync.Mutex)),
  97. }
  98. }
  99. func (s *indexHandler) Serve(ctx context.Context) (err error) {
  100. l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
  101. defer func() {
  102. err = svcutil.NoRestartErr(err)
  103. l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
  104. }()
  105. // We need to send one index, regardless of whether there is something to send or not
  106. s.cond.L.Lock()
  107. for s.paused {
  108. s.cond.Wait()
  109. }
  110. fset := s.fset
  111. s.cond.L.Unlock()
  112. err = s.sendIndexTo(ctx, fset)
  113. // Subscribe to LocalIndexUpdated (we have new information to send) and
  114. // DeviceDisconnected (it might be us who disconnected, so we should
  115. // exit).
  116. sub := s.evLogger.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
  117. defer sub.Unsubscribe()
  118. evChan := sub.C()
  119. ticker := time.NewTicker(time.Minute)
  120. defer ticker.Stop()
  121. for err == nil {
  122. s.cond.L.Lock()
  123. for s.paused {
  124. s.cond.Wait()
  125. }
  126. fset := s.fset
  127. s.cond.L.Unlock()
  128. // While we have sent a sequence at least equal to the one
  129. // currently in the database, wait for the local index to update. The
  130. // local index may update for other folders than the one we are
  131. // sending for.
  132. if fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
  133. select {
  134. case <-ctx.Done():
  135. return ctx.Err()
  136. case <-evChan:
  137. case <-ticker.C:
  138. }
  139. continue
  140. }
  141. err = s.sendIndexTo(ctx, fset)
  142. // Wait a short amount of time before entering the next loop. If there
  143. // are continuous changes happening to the local index, this gives us
  144. // time to batch them up a little.
  145. select {
  146. case <-ctx.Done():
  147. return ctx.Err()
  148. case <-time.After(250 * time.Millisecond):
  149. }
  150. }
  151. return err
  152. }
  153. // resume might be called because the folder was actually resumed, or just
  154. // because the folder config changed (and thus the runner and potentially fset).
  155. func (s *indexHandler) resume(fset *db.FileSet, runner service) {
  156. s.cond.L.Lock()
  157. s.paused = false
  158. s.fset = fset
  159. s.runner = runner
  160. s.cond.L.Unlock()
  161. }
  162. func (s *indexHandler) pause() {
  163. s.cond.L.Lock()
  164. if s.paused {
  165. s.evLogger.Log(events.Failure, "index handler got paused while already paused")
  166. }
  167. s.paused = true
  168. s.fset = nil
  169. s.runner = nil
  170. s.cond.L.Unlock()
  171. }
  172. // sendIndexTo sends file infos with a sequence number higher than prevSequence and
  173. // returns the highest sent sequence number.
  174. func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error {
  175. initial := s.prevSequence == 0
  176. batch := db.NewFileInfoBatch(nil)
  177. batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
  178. l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
  179. if initial {
  180. initial = false
  181. return s.conn.Index(ctx, s.folder, fs)
  182. }
  183. return s.conn.IndexUpdate(ctx, s.folder, fs)
  184. })
  185. var err error
  186. var f protocol.FileInfo
  187. snap, err := fset.Snapshot()
  188. if err != nil {
  189. return svcutil.AsFatalErr(err, svcutil.ExitError)
  190. }
  191. defer snap.Release()
  192. previousWasDelete := false
  193. snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
  194. // This is to make sure that renames (which is an add followed by a delete) land in the same batch.
  195. // Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
  196. // the batch ends with a non-delete, or that the last item in the batch is already a delete
  197. if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
  198. if err = batch.Flush(); err != nil {
  199. return false
  200. }
  201. }
  202. if shouldDebug() {
  203. if fi.SequenceNo() < s.prevSequence+1 {
  204. panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
  205. }
  206. }
  207. if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
  208. l.Warnln("Non-increasing sequence detected: Checking and repairing the db...")
  209. // Abort this round of index sending - the next one will pick
  210. // up from the last successful one with the repeaired db.
  211. defer func() {
  212. if fixed, dbErr := fset.RepairSequence(); dbErr != nil {
  213. l.Warnln("Failed repairing sequence entries:", dbErr)
  214. panic("Failed repairing sequence entries")
  215. } else {
  216. s.evLogger.Log(events.Failure, "detected and repaired non-increasing sequence")
  217. l.Infof("Repaired %v sequence entries in database", fixed)
  218. }
  219. }()
  220. return false
  221. }
  222. f = fi.(protocol.FileInfo)
  223. // If this is a folder receiving encrypted files only, we
  224. // mustn't ever send locally changed file infos. Those aren't
  225. // encrypted and thus would be a protocol error at the remote.
  226. if s.folderIsReceiveEncrypted && fi.IsReceiveOnlyChanged() {
  227. return true
  228. }
  229. f = prepareFileInfoForIndex(f)
  230. previousWasDelete = f.IsDeleted()
  231. batch.Append(f)
  232. return true
  233. })
  234. if err != nil {
  235. return err
  236. }
  237. err = batch.Flush()
  238. // True if there was nothing to be sent
  239. if f.Sequence == 0 {
  240. return err
  241. }
  242. s.prevSequence = f.Sequence
  243. return err
  244. }
  245. func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string) error {
  246. deviceID := s.conn.ID()
  247. s.cond.L.Lock()
  248. paused := s.paused
  249. fset := s.fset
  250. runner := s.runner
  251. s.cond.L.Unlock()
  252. if paused {
  253. l.Infof("%v for paused folder %q", op, s.folder)
  254. return fmt.Errorf("%v: %w", s.folder, ErrFolderPaused)
  255. }
  256. defer runner.SchedulePull()
  257. s.downloads.Update(s.folder, makeForgetUpdate(fs))
  258. if !update {
  259. fset.Drop(deviceID)
  260. }
  261. for i := range fs {
  262. // The local attributes should never be transmitted over the wire.
  263. // Make sure they look like they weren't.
  264. fs[i].LocalFlags = 0
  265. fs[i].VersionHash = nil
  266. }
  267. fset.Update(deviceID, fs)
  268. seq := fset.Sequence(deviceID)
  269. s.evLogger.Log(events.RemoteIndexUpdated, map[string]interface{}{
  270. "device": deviceID.String(),
  271. "folder": s.folder,
  272. "items": len(fs),
  273. "sequence": seq,
  274. "version": seq, // legacy for sequence
  275. })
  276. return nil
  277. }
  278. func prepareFileInfoForIndex(f protocol.FileInfo) protocol.FileInfo {
  279. // Mark the file as invalid if any of the local bad stuff flags are set.
  280. f.RawInvalid = f.IsInvalid()
  281. // If the file is marked LocalReceive (i.e., changed locally on a
  282. // receive only folder) we do not want it to ever become the
  283. // globally best version, invalid or not.
  284. if f.IsReceiveOnlyChanged() {
  285. f.Version = protocol.Vector{}
  286. }
  287. // never sent externally
  288. f.LocalFlags = 0
  289. f.VersionHash = nil
  290. return f
  291. }
  292. func (s *indexHandler) String() string {
  293. return fmt.Sprintf("indexHandler@%p for %s to %s at %s", s, s.folder, s.conn.ID().Short(), s.conn)
  294. }
  295. type indexHandlerRegistry struct {
  296. sup *suture.Supervisor
  297. evLogger events.Logger
  298. conn protocol.Connection
  299. downloads *deviceDownloadState
  300. indexHandlers map[string]*indexHandler
  301. startInfos map[string]*clusterConfigDeviceInfo
  302. folderStates map[string]*indexHandlerFolderState
  303. mut sync.Mutex
  304. }
  305. type indexHandlerFolderState struct {
  306. cfg config.FolderConfiguration
  307. fset *db.FileSet
  308. runner service
  309. }
  310. func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry {
  311. r := &indexHandlerRegistry{
  312. conn: conn,
  313. downloads: downloads,
  314. evLogger: evLogger,
  315. indexHandlers: make(map[string]*indexHandler),
  316. startInfos: make(map[string]*clusterConfigDeviceInfo),
  317. folderStates: make(map[string]*indexHandlerFolderState),
  318. mut: sync.Mutex{},
  319. }
  320. r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l))
  321. ourToken := parentSup.Add(r.sup)
  322. r.sup.Add(svcutil.AsService(func(ctx context.Context) error {
  323. select {
  324. case <-ctx.Done():
  325. return ctx.Err()
  326. case <-closed:
  327. parentSup.Remove(ourToken)
  328. }
  329. return nil
  330. }, fmt.Sprintf("%v/waitForClosed", r)))
  331. return r
  332. }
  333. func (r *indexHandlerRegistry) String() string {
  334. return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.ID().Short())
  335. }
  336. func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor {
  337. return r.sup
  338. }
  339. func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
  340. if is, ok := r.indexHandlers[folder.ID]; ok {
  341. r.sup.RemoveAndWait(is.token, 0)
  342. delete(r.indexHandlers, folder.ID)
  343. }
  344. delete(r.startInfos, folder.ID)
  345. is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
  346. is.token = r.sup.Add(is)
  347. r.indexHandlers[folder.ID] = is
  348. }
  349. // AddIndexInfo starts an index handler for given folder, unless it is paused.
  350. // If it is paused, the given startInfo is stored to start the sender once the
  351. // folder is resumed.
  352. // If an index handler is already running, it will be stopped first.
  353. func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterConfigDeviceInfo) {
  354. r.mut.Lock()
  355. defer r.mut.Unlock()
  356. if is, ok := r.indexHandlers[folder]; ok {
  357. r.sup.RemoveAndWait(is.token, 0)
  358. delete(r.indexHandlers, folder)
  359. l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.ID().Short(), folder)
  360. }
  361. folderState, ok := r.folderStates[folder]
  362. if !ok {
  363. l.Debugf("Pending index handler for device %v and folder %v", r.conn.ID().Short(), folder)
  364. r.startInfos[folder] = startInfo
  365. return
  366. }
  367. r.startLocked(folderState.cfg, folderState.fset, folderState.runner, startInfo)
  368. }
  369. // Remove stops a running index handler or removes one pending to be started.
  370. // It is a noop if the folder isn't known.
  371. func (r *indexHandlerRegistry) Remove(folder string) {
  372. r.mut.Lock()
  373. defer r.mut.Unlock()
  374. l.Debugf("Removing index handler for device %v and folder %v", r.conn.ID().Short(), folder)
  375. if is, ok := r.indexHandlers[folder]; ok {
  376. r.sup.RemoveAndWait(is.token, 0)
  377. delete(r.indexHandlers, folder)
  378. }
  379. delete(r.startInfos, folder)
  380. l.Debugf("Removed index handler for device %v and folder %v", r.conn.ID().Short(), folder)
  381. }
  382. // RemoveAllExcept stops all running index handlers and removes those pending to be started,
  383. // except mentioned ones.
  384. // It is a noop if the folder isn't known.
  385. func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]struct{}) {
  386. r.mut.Lock()
  387. defer r.mut.Unlock()
  388. for folder, is := range r.indexHandlers {
  389. if _, ok := except[folder]; !ok {
  390. r.sup.RemoveAndWait(is.token, 0)
  391. delete(r.indexHandlers, folder)
  392. l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder)
  393. }
  394. }
  395. for folder := range r.startInfos {
  396. if _, ok := except[folder]; !ok {
  397. delete(r.startInfos, folder)
  398. l.Debugf("Removed pending index handler for device %v and folder %v (removeAllExcept)", r.conn.ID().Short(), folder)
  399. }
  400. }
  401. }
  402. // RegisterFolderState must be called whenever something about the folder
  403. // changes. The exception being if the folder is removed entirely, then call
  404. // Remove. The fset and runner arguments may be nil, if given folder is paused.
  405. func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
  406. if !folder.SharedWith(r.conn.ID()) {
  407. r.Remove(folder.ID)
  408. return
  409. }
  410. r.mut.Lock()
  411. if folder.Paused {
  412. r.folderPausedLocked(folder.ID)
  413. } else {
  414. r.folderRunningLocked(folder, fset, runner)
  415. }
  416. r.mut.Unlock()
  417. }
  418. // folderPausedLocked stops a running index handler.
  419. // It is a noop if the folder isn't known or has not been started yet.
  420. func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
  421. l.Debugf("Pausing index handler for device %v and folder %v", r.conn.ID().Short(), folder)
  422. delete(r.folderStates, folder)
  423. if is, ok := r.indexHandlers[folder]; ok {
  424. is.pause()
  425. l.Debugf("Paused index handler for device %v and folder %v", r.conn.ID().Short(), folder)
  426. } else {
  427. l.Debugf("No index handler for device %v and folder %v to pause", r.conn.ID().Short(), folder)
  428. }
  429. }
  430. // folderRunningLocked resumes an already running index handler or starts it, if it
  431. // was added while paused.
  432. // It is a noop if the folder isn't known.
  433. func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service) {
  434. r.folderStates[folder.ID] = &indexHandlerFolderState{
  435. cfg: folder,
  436. fset: fset,
  437. runner: runner,
  438. }
  439. is, isOk := r.indexHandlers[folder.ID]
  440. if info, ok := r.startInfos[folder.ID]; ok {
  441. if isOk {
  442. r.sup.RemoveAndWait(is.token, 0)
  443. delete(r.indexHandlers, folder.ID)
  444. l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID)
  445. }
  446. r.startLocked(folder, fset, runner, info)
  447. delete(r.startInfos, folder.ID)
  448. l.Debugf("Started index handler for device %v and folder %v in resume", r.conn.ID().Short(), folder.ID)
  449. } else if isOk {
  450. l.Debugf("Resuming index handler for device %v and folder %v", r.conn.ID().Short(), folder)
  451. is.resume(fset, runner)
  452. } else {
  453. l.Debugf("Not resuming index handler for device %v and folder %v as none is paused and there is no start info", r.conn.ID().Short(), folder.ID)
  454. }
  455. }
  456. func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
  457. r.mut.Lock()
  458. defer r.mut.Unlock()
  459. is, isOk := r.indexHandlers[folder]
  460. if !isOk {
  461. l.Infof("%v for nonexistent or paused folder %q", op, folder)
  462. return ErrFolderMissing
  463. }
  464. return is.receive(fs, update, op)
  465. }
  466. // makeForgetUpdate takes an index update and constructs a download progress update
  467. // causing to forget any progress for files which we've just been sent.
  468. func makeForgetUpdate(files []protocol.FileInfo) []protocol.FileDownloadProgressUpdate {
  469. updates := make([]protocol.FileDownloadProgressUpdate, 0, len(files))
  470. for _, file := range files {
  471. if file.IsSymlink() || file.IsDirectory() || file.IsDeleted() {
  472. continue
  473. }
  474. updates = append(updates, protocol.FileDownloadProgressUpdate{
  475. Name: file.Name,
  476. Version: file.Version,
  477. UpdateType: protocol.FileDownloadProgressUpdateTypeForget,
  478. })
  479. }
  480. return updates
  481. }