set.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package db provides a set type to track local/remote files with newness
  7. // checks. We must do a certain amount of normalization in here. We will get
  8. // fed paths with either native or wire-format separators and encodings
  9. // depending on who calls us. We transform paths to wire-format (NFC and
  10. // slashes) on the way to the database, and transform to native format
  11. // (varying separator and encoding) on the way back out.
  12. package db
  13. import (
  14. "os"
  15. "time"
  16. "github.com/syncthing/syncthing/lib/db/backend"
  17. "github.com/syncthing/syncthing/lib/fs"
  18. "github.com/syncthing/syncthing/lib/osutil"
  19. "github.com/syncthing/syncthing/lib/protocol"
  20. "github.com/syncthing/syncthing/lib/sync"
  21. )
  22. type FileSet struct {
  23. folder string
  24. fs fs.Filesystem
  25. db *Lowlevel
  26. meta *metadataTracker
  27. updateMutex sync.Mutex // protects database updates and the corresponding metadata changes
  28. }
  29. // FileIntf is the set of methods implemented by both protocol.FileInfo and
  30. // FileInfoTruncated.
  31. type FileIntf interface {
  32. FileSize() int64
  33. FileName() string
  34. FileLocalFlags() uint32
  35. IsDeleted() bool
  36. IsInvalid() bool
  37. IsIgnored() bool
  38. IsUnsupported() bool
  39. MustRescan() bool
  40. IsReceiveOnlyChanged() bool
  41. IsDirectory() bool
  42. IsSymlink() bool
  43. ShouldConflict() bool
  44. HasPermissionBits() bool
  45. SequenceNo() int64
  46. BlockSize() int
  47. FileVersion() protocol.Vector
  48. FileType() protocol.FileInfoType
  49. FilePermissions() uint32
  50. FileModifiedBy() protocol.ShortID
  51. ModTime() time.Time
  52. }
  53. // The Iterator is called with either a protocol.FileInfo or a
  54. // FileInfoTruncated (depending on the method) and returns true to
  55. // continue iteration, false to stop.
  56. type Iterator func(f FileIntf) bool
  57. var databaseRecheckInterval = 30 * 24 * time.Hour
  58. func init() {
  59. if dur, err := time.ParseDuration(os.Getenv("STRECHECKDBEVERY")); err == nil {
  60. databaseRecheckInterval = dur
  61. }
  62. }
  63. func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet {
  64. var s = &FileSet{
  65. folder: folder,
  66. fs: fs,
  67. db: db,
  68. meta: newMetadataTracker(),
  69. updateMutex: sync.NewMutex(),
  70. }
  71. recalc := func() *FileSet {
  72. if err := s.recalcMeta(); backend.IsClosed(err) {
  73. return nil
  74. } else if err != nil {
  75. panic(err)
  76. }
  77. return s
  78. }
  79. if err := s.meta.fromDB(db, []byte(folder)); err != nil {
  80. l.Infof("No stored folder metadata for %q; recalculating", folder)
  81. return recalc()
  82. }
  83. if metaOK := s.verifyLocalSequence(); !metaOK {
  84. l.Infof("Stored folder metadata for %q is out of date after crash; recalculating", folder)
  85. return recalc()
  86. }
  87. if age := time.Since(s.meta.Created()); age > databaseRecheckInterval {
  88. l.Infof("Stored folder metadata for %q is %v old; recalculating", folder, age)
  89. return recalc()
  90. }
  91. return s
  92. }
  93. func (s *FileSet) recalcMeta() error {
  94. s.meta = newMetadataTracker()
  95. if err := s.db.checkGlobals([]byte(s.folder), s.meta); err != nil {
  96. return err
  97. }
  98. t, err := s.db.newReadWriteTransaction()
  99. if err != nil {
  100. return err
  101. }
  102. defer t.close()
  103. var deviceID protocol.DeviceID
  104. err = t.withAllFolderTruncated([]byte(s.folder), func(device []byte, f FileInfoTruncated) bool {
  105. copy(deviceID[:], device)
  106. s.meta.addFile(deviceID, f)
  107. return true
  108. })
  109. if err != nil {
  110. return err
  111. }
  112. s.meta.SetCreated()
  113. if err := s.meta.toDB(t, []byte(s.folder)); err != nil {
  114. return err
  115. }
  116. return t.Commit()
  117. }
  118. // Verify the local sequence number from actual sequence entries. Returns
  119. // true if it was all good, or false if a fixup was necessary.
  120. func (s *FileSet) verifyLocalSequence() bool {
  121. // Walk the sequence index from the current (supposedly) highest
  122. // sequence number and raise the alarm if we get anything. This recovers
  123. // from the occasion where we have written sequence entries to disk but
  124. // not yet written new metadata to disk.
  125. //
  126. // Note that we can have the same thing happen for remote devices but
  127. // there it's not a problem -- we'll simply advertise a lower sequence
  128. // number than we've actually seen and receive some duplicate updates
  129. // and then be in sync again.
  130. curSeq := s.meta.Sequence(protocol.LocalDeviceID)
  131. snap := s.Snapshot()
  132. ok := true
  133. snap.WithHaveSequence(curSeq+1, func(fi FileIntf) bool {
  134. ok = false // we got something, which we should not have
  135. return false
  136. })
  137. snap.Release()
  138. return ok
  139. }
  140. func (s *FileSet) Drop(device protocol.DeviceID) {
  141. l.Debugf("%s Drop(%v)", s.folder, device)
  142. s.updateMutex.Lock()
  143. defer s.updateMutex.Unlock()
  144. if err := s.db.dropDeviceFolder(device[:], []byte(s.folder), s.meta); backend.IsClosed(err) {
  145. return
  146. } else if err != nil {
  147. panic(err)
  148. }
  149. if device == protocol.LocalDeviceID {
  150. s.meta.resetCounts(device)
  151. // We deliberately do not reset the sequence number here. Dropping
  152. // all files for the local device ID only happens in testing - which
  153. // expects the sequence to be retained, like an old Replace() of all
  154. // files would do. However, if we ever did it "in production" we
  155. // would anyway want to retain the sequence for delta indexes to be
  156. // happy.
  157. } else {
  158. // Here, on the other hand, we want to make sure that any file
  159. // announced from the remote is newer than our current sequence
  160. // number.
  161. s.meta.resetAll(device)
  162. }
  163. t, err := s.db.newReadWriteTransaction()
  164. if backend.IsClosed(err) {
  165. return
  166. } else if err != nil {
  167. panic(err)
  168. }
  169. defer t.close()
  170. if err := s.meta.toDB(t, []byte(s.folder)); backend.IsClosed(err) {
  171. return
  172. } else if err != nil {
  173. panic(err)
  174. }
  175. if err := t.Commit(); backend.IsClosed(err) {
  176. return
  177. } else if err != nil {
  178. panic(err)
  179. }
  180. }
  181. func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
  182. l.Debugf("%s Update(%v, [%d])", s.folder, device, len(fs))
  183. // do not modify fs in place, it is still used in outer scope
  184. fs = append([]protocol.FileInfo(nil), fs...)
  185. normalizeFilenames(fs)
  186. s.updateMutex.Lock()
  187. defer s.updateMutex.Unlock()
  188. if device == protocol.LocalDeviceID {
  189. // For the local device we have a bunch of metadata to track.
  190. if err := s.db.updateLocalFiles([]byte(s.folder), fs, s.meta); err != nil && !backend.IsClosed(err) {
  191. panic(err)
  192. }
  193. return
  194. }
  195. // Easy case, just update the files and we're done.
  196. if err := s.db.updateRemoteFiles([]byte(s.folder), device[:], fs, s.meta); err != nil && !backend.IsClosed(err) {
  197. panic(err)
  198. }
  199. }
  200. type Snapshot struct {
  201. folder string
  202. t readOnlyTransaction
  203. meta *countsMap
  204. }
  205. func (s *FileSet) Snapshot() *Snapshot {
  206. t, err := s.db.newReadOnlyTransaction()
  207. if err != nil {
  208. panic(err)
  209. }
  210. return &Snapshot{
  211. folder: s.folder,
  212. t: t,
  213. meta: s.meta.Snapshot(),
  214. }
  215. }
  216. func (s *Snapshot) Release() {
  217. s.t.close()
  218. }
  219. func (s *Snapshot) WithNeed(device protocol.DeviceID, fn Iterator) {
  220. l.Debugf("%s WithNeed(%v)", s.folder, device)
  221. if err := s.t.withNeed([]byte(s.folder), device[:], false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  222. panic(err)
  223. }
  224. }
  225. func (s *Snapshot) WithNeedTruncated(device protocol.DeviceID, fn Iterator) {
  226. l.Debugf("%s WithNeedTruncated(%v)", s.folder, device)
  227. if err := s.t.withNeed([]byte(s.folder), device[:], true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  228. panic(err)
  229. }
  230. }
  231. func (s *Snapshot) WithHave(device protocol.DeviceID, fn Iterator) {
  232. l.Debugf("%s WithHave(%v)", s.folder, device)
  233. if err := s.t.withHave([]byte(s.folder), device[:], nil, false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  234. panic(err)
  235. }
  236. }
  237. func (s *Snapshot) WithHaveTruncated(device protocol.DeviceID, fn Iterator) {
  238. l.Debugf("%s WithHaveTruncated(%v)", s.folder, device)
  239. if err := s.t.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  240. panic(err)
  241. }
  242. }
  243. func (s *Snapshot) WithHaveSequence(startSeq int64, fn Iterator) {
  244. l.Debugf("%s WithHaveSequence(%v)", s.folder, startSeq)
  245. if err := s.t.withHaveSequence([]byte(s.folder), startSeq, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  246. panic(err)
  247. }
  248. }
  249. // Except for an item with a path equal to prefix, only children of prefix are iterated.
  250. // E.g. for prefix "dir", "dir/file" is iterated, but "dir.file" is not.
  251. func (s *Snapshot) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) {
  252. l.Debugf(`%s WithPrefixedHaveTruncated(%v, "%v")`, s.folder, device, prefix)
  253. if err := s.t.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  254. panic(err)
  255. }
  256. }
  257. func (s *Snapshot) WithGlobal(fn Iterator) {
  258. l.Debugf("%s WithGlobal()", s.folder)
  259. if err := s.t.withGlobal([]byte(s.folder), nil, false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  260. panic(err)
  261. }
  262. }
  263. func (s *Snapshot) WithGlobalTruncated(fn Iterator) {
  264. l.Debugf("%s WithGlobalTruncated()", s.folder)
  265. if err := s.t.withGlobal([]byte(s.folder), nil, true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  266. panic(err)
  267. }
  268. }
  269. // Except for an item with a path equal to prefix, only children of prefix are iterated.
  270. // E.g. for prefix "dir", "dir/file" is iterated, but "dir.file" is not.
  271. func (s *Snapshot) WithPrefixedGlobalTruncated(prefix string, fn Iterator) {
  272. l.Debugf(`%s WithPrefixedGlobalTruncated("%v")`, s.folder, prefix)
  273. if err := s.t.withGlobal([]byte(s.folder), []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  274. panic(err)
  275. }
  276. }
  277. func (s *Snapshot) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) {
  278. f, ok, err := s.t.getFile([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file)))
  279. if backend.IsClosed(err) {
  280. return protocol.FileInfo{}, false
  281. } else if err != nil {
  282. panic(err)
  283. }
  284. f.Name = osutil.NativeFilename(f.Name)
  285. return f, ok
  286. }
  287. func (s *Snapshot) GetGlobal(file string) (protocol.FileInfo, bool) {
  288. _, fi, ok, err := s.t.getGlobal(nil, []byte(s.folder), []byte(osutil.NormalizedFilename(file)), false)
  289. if backend.IsClosed(err) {
  290. return protocol.FileInfo{}, false
  291. } else if err != nil {
  292. panic(err)
  293. }
  294. if !ok {
  295. return protocol.FileInfo{}, false
  296. }
  297. f := fi.(protocol.FileInfo)
  298. f.Name = osutil.NativeFilename(f.Name)
  299. return f, true
  300. }
  301. func (s *Snapshot) GetGlobalTruncated(file string) (FileInfoTruncated, bool) {
  302. _, fi, ok, err := s.t.getGlobal(nil, []byte(s.folder), []byte(osutil.NormalizedFilename(file)), true)
  303. if backend.IsClosed(err) {
  304. return FileInfoTruncated{}, false
  305. } else if err != nil {
  306. panic(err)
  307. }
  308. if !ok {
  309. return FileInfoTruncated{}, false
  310. }
  311. f := fi.(FileInfoTruncated)
  312. f.Name = osutil.NativeFilename(f.Name)
  313. return f, true
  314. }
  315. func (s *Snapshot) Availability(file string) []protocol.DeviceID {
  316. av, err := s.t.availability([]byte(s.folder), []byte(osutil.NormalizedFilename(file)))
  317. if backend.IsClosed(err) {
  318. return nil
  319. } else if err != nil {
  320. panic(err)
  321. }
  322. return av
  323. }
  324. func (s *Snapshot) Sequence(device protocol.DeviceID) int64 {
  325. return s.meta.Counts(device, 0).Sequence
  326. }
  327. // RemoteSequence returns the change version for the given folder, as
  328. // sent by remote peers. This is guaranteed to increment if the contents of
  329. // the remote or global folder has changed.
  330. func (s *Snapshot) RemoteSequence() int64 {
  331. var ver int64
  332. for _, device := range s.meta.devices() {
  333. ver += s.Sequence(device)
  334. }
  335. return ver
  336. }
  337. func (s *Snapshot) LocalSize() Counts {
  338. local := s.meta.Counts(protocol.LocalDeviceID, 0)
  339. return local.Add(s.ReceiveOnlyChangedSize())
  340. }
  341. func (s *Snapshot) ReceiveOnlyChangedSize() Counts {
  342. return s.meta.Counts(protocol.LocalDeviceID, protocol.FlagLocalReceiveOnly)
  343. }
  344. func (s *Snapshot) GlobalSize() Counts {
  345. global := s.meta.Counts(protocol.GlobalDeviceID, 0)
  346. recvOnlyChanged := s.meta.Counts(protocol.GlobalDeviceID, protocol.FlagLocalReceiveOnly)
  347. return global.Add(recvOnlyChanged)
  348. }
  349. func (s *Snapshot) NeedSize() Counts {
  350. var result Counts
  351. s.WithNeedTruncated(protocol.LocalDeviceID, func(f FileIntf) bool {
  352. switch {
  353. case f.IsDeleted():
  354. result.Deleted++
  355. case f.IsDirectory():
  356. result.Directories++
  357. case f.IsSymlink():
  358. result.Symlinks++
  359. default:
  360. result.Files++
  361. result.Bytes += f.FileSize()
  362. }
  363. return true
  364. })
  365. return result
  366. }
  367. // LocalChangedFiles returns a paginated list of currently needed files in
  368. // progress, queued, and to be queued on next puller iteration, as well as the
  369. // total number of files currently needed.
  370. func (s *Snapshot) LocalChangedFiles(page, perpage int) []FileInfoTruncated {
  371. if s.ReceiveOnlyChangedSize().TotalItems() == 0 {
  372. return nil
  373. }
  374. files := make([]FileInfoTruncated, 0, perpage)
  375. skip := (page - 1) * perpage
  376. get := perpage
  377. s.WithHaveTruncated(protocol.LocalDeviceID, func(f FileIntf) bool {
  378. if !f.IsReceiveOnlyChanged() {
  379. return true
  380. }
  381. if skip > 0 {
  382. skip--
  383. return true
  384. }
  385. ft := f.(FileInfoTruncated)
  386. files = append(files, ft)
  387. get--
  388. return get > 0
  389. })
  390. return files
  391. }
  392. // RemoteNeedFolderFiles returns paginated list of currently needed files in
  393. // progress, queued, and to be queued on next puller iteration, as well as the
  394. // total number of files currently needed.
  395. func (s *Snapshot) RemoteNeedFolderFiles(device protocol.DeviceID, page, perpage int) []FileInfoTruncated {
  396. files := make([]FileInfoTruncated, 0, perpage)
  397. skip := (page - 1) * perpage
  398. get := perpage
  399. s.WithNeedTruncated(device, func(f FileIntf) bool {
  400. if skip > 0 {
  401. skip--
  402. return true
  403. }
  404. files = append(files, f.(FileInfoTruncated))
  405. get--
  406. return get > 0
  407. })
  408. return files
  409. }
  410. func (s *FileSet) Sequence(device protocol.DeviceID) int64 {
  411. return s.meta.Sequence(device)
  412. }
  413. func (s *FileSet) IndexID(device protocol.DeviceID) protocol.IndexID {
  414. id, err := s.db.getIndexID(device[:], []byte(s.folder))
  415. if backend.IsClosed(err) {
  416. return 0
  417. } else if err != nil {
  418. panic(err)
  419. }
  420. if id == 0 && device == protocol.LocalDeviceID {
  421. // No index ID set yet. We create one now.
  422. id = protocol.NewIndexID()
  423. err := s.db.setIndexID(device[:], []byte(s.folder), id)
  424. if backend.IsClosed(err) {
  425. return 0
  426. } else if err != nil {
  427. panic(err)
  428. }
  429. }
  430. return id
  431. }
  432. func (s *FileSet) SetIndexID(device protocol.DeviceID, id protocol.IndexID) {
  433. if device == protocol.LocalDeviceID {
  434. panic("do not explicitly set index ID for local device")
  435. }
  436. if err := s.db.setIndexID(device[:], []byte(s.folder), id); err != nil && !backend.IsClosed(err) {
  437. panic(err)
  438. }
  439. }
  440. func (s *FileSet) MtimeFS() *fs.MtimeFS {
  441. prefix, err := s.db.keyer.GenerateMtimesKey(nil, []byte(s.folder))
  442. if backend.IsClosed(err) {
  443. return nil
  444. } else if err != nil {
  445. panic(err)
  446. }
  447. kv := NewNamespacedKV(s.db, string(prefix))
  448. return fs.NewMtimeFS(s.fs, kv)
  449. }
  450. func (s *FileSet) ListDevices() []protocol.DeviceID {
  451. return s.meta.devices()
  452. }
  453. // DropFolder clears out all information related to the given folder from the
  454. // database.
  455. func DropFolder(db *Lowlevel, folder string) {
  456. droppers := []func([]byte) error{
  457. db.dropFolder,
  458. db.dropMtimes,
  459. db.dropFolderMeta,
  460. db.folderIdx.Delete,
  461. }
  462. for _, drop := range droppers {
  463. if err := drop([]byte(folder)); backend.IsClosed(err) {
  464. return
  465. } else if err != nil {
  466. panic(err)
  467. }
  468. }
  469. }
  470. // DropDeltaIndexIDs removes all delta index IDs from the database.
  471. // This will cause a full index transmission on the next connection.
  472. func DropDeltaIndexIDs(db *Lowlevel) {
  473. dbi, err := db.NewPrefixIterator([]byte{KeyTypeIndexID})
  474. if backend.IsClosed(err) {
  475. return
  476. } else if err != nil {
  477. panic(err)
  478. }
  479. defer dbi.Release()
  480. for dbi.Next() {
  481. if err := db.Delete(dbi.Key()); err != nil && !backend.IsClosed(err) {
  482. panic(err)
  483. }
  484. }
  485. if err := dbi.Error(); err != nil && !backend.IsClosed(err) {
  486. panic(err)
  487. }
  488. }
  489. func normalizeFilenames(fs []protocol.FileInfo) {
  490. for i := range fs {
  491. fs[i].Name = osutil.NormalizedFilename(fs[i].Name)
  492. }
  493. }
  494. func nativeFileIterator(fn Iterator) Iterator {
  495. return func(fi FileIntf) bool {
  496. switch f := fi.(type) {
  497. case protocol.FileInfo:
  498. f.Name = osutil.NativeFilename(f.Name)
  499. return fn(f)
  500. case FileInfoTruncated:
  501. f.Name = osutil.NativeFilename(f.Name)
  502. return fn(f)
  503. default:
  504. panic("unknown interface type")
  505. }
  506. }
  507. }