set.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. // Package db provides a set type to track local/remote files with newness
  7. // checks. We must do a certain amount of normalization in here. We will get
  8. // fed paths with either native or wire-format separators and encodings
  9. // depending on who calls us. We transform paths to wire-format (NFC and
  10. // slashes) on the way to the database, and transform to native format
  11. // (varying separator and encoding) on the way back out.
  12. package db
  13. import (
  14. "time"
  15. "github.com/syncthing/syncthing/lib/db/backend"
  16. "github.com/syncthing/syncthing/lib/fs"
  17. "github.com/syncthing/syncthing/lib/osutil"
  18. "github.com/syncthing/syncthing/lib/protocol"
  19. "github.com/syncthing/syncthing/lib/sync"
  20. )
  21. type FileSet struct {
  22. folder string
  23. fs fs.Filesystem
  24. db *Lowlevel
  25. meta *metadataTracker
  26. updateMutex sync.Mutex // protects database updates and the corresponding metadata changes
  27. }
  28. // FileIntf is the set of methods implemented by both protocol.FileInfo and
  29. // FileInfoTruncated.
  30. type FileIntf interface {
  31. FileSize() int64
  32. FileName() string
  33. FileLocalFlags() uint32
  34. IsDeleted() bool
  35. IsInvalid() bool
  36. IsIgnored() bool
  37. IsUnsupported() bool
  38. MustRescan() bool
  39. IsReceiveOnlyChanged() bool
  40. IsDirectory() bool
  41. IsSymlink() bool
  42. ShouldConflict() bool
  43. HasPermissionBits() bool
  44. SequenceNo() int64
  45. BlockSize() int
  46. FileVersion() protocol.Vector
  47. FileType() protocol.FileInfoType
  48. FilePermissions() uint32
  49. FileModifiedBy() protocol.ShortID
  50. ModTime() time.Time
  51. }
  52. // The Iterator is called with either a protocol.FileInfo or a
  53. // FileInfoTruncated (depending on the method) and returns true to
  54. // continue iteration, false to stop.
  55. type Iterator func(f FileIntf) bool
  56. func NewFileSet(folder string, fs fs.Filesystem, db *Lowlevel) *FileSet {
  57. return &FileSet{
  58. folder: folder,
  59. fs: fs,
  60. db: db,
  61. meta: db.loadMetadataTracker(folder),
  62. updateMutex: sync.NewMutex(),
  63. }
  64. }
  65. func (s *FileSet) Drop(device protocol.DeviceID) {
  66. l.Debugf("%s Drop(%v)", s.folder, device)
  67. s.updateMutex.Lock()
  68. defer s.updateMutex.Unlock()
  69. if err := s.db.dropDeviceFolder(device[:], []byte(s.folder), s.meta); backend.IsClosed(err) {
  70. return
  71. } else if err != nil {
  72. panic(err)
  73. }
  74. if device == protocol.LocalDeviceID {
  75. s.meta.resetCounts(device)
  76. // We deliberately do not reset the sequence number here. Dropping
  77. // all files for the local device ID only happens in testing - which
  78. // expects the sequence to be retained, like an old Replace() of all
  79. // files would do. However, if we ever did it "in production" we
  80. // would anyway want to retain the sequence for delta indexes to be
  81. // happy.
  82. } else {
  83. // Here, on the other hand, we want to make sure that any file
  84. // announced from the remote is newer than our current sequence
  85. // number.
  86. s.meta.resetAll(device)
  87. }
  88. t, err := s.db.newReadWriteTransaction()
  89. if backend.IsClosed(err) {
  90. return
  91. } else if err != nil {
  92. panic(err)
  93. }
  94. defer t.close()
  95. if err := s.meta.toDB(t, []byte(s.folder)); backend.IsClosed(err) {
  96. return
  97. } else if err != nil {
  98. panic(err)
  99. }
  100. if err := t.Commit(); backend.IsClosed(err) {
  101. return
  102. } else if err != nil {
  103. panic(err)
  104. }
  105. }
  106. func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
  107. l.Debugf("%s Update(%v, [%d])", s.folder, device, len(fs))
  108. // do not modify fs in place, it is still used in outer scope
  109. fs = append([]protocol.FileInfo(nil), fs...)
  110. // If one file info is present multiple times, only keep the last.
  111. // Updating the same file multiple times is problematic, because the
  112. // previous updates won't yet be represented in the db when we update it
  113. // again. Additionally even if that problem was taken care of, it would
  114. // be pointless because we remove the previously added file info again
  115. // right away.
  116. fs = normalizeFilenamesAndDropDuplicates(fs)
  117. s.updateMutex.Lock()
  118. defer s.updateMutex.Unlock()
  119. if device == protocol.LocalDeviceID {
  120. // For the local device we have a bunch of metadata to track.
  121. if err := s.db.updateLocalFiles([]byte(s.folder), fs, s.meta); err != nil && !backend.IsClosed(err) {
  122. panic(err)
  123. }
  124. return
  125. }
  126. // Easy case, just update the files and we're done.
  127. if err := s.db.updateRemoteFiles([]byte(s.folder), device[:], fs, s.meta); err != nil && !backend.IsClosed(err) {
  128. panic(err)
  129. }
  130. }
  131. type Snapshot struct {
  132. folder string
  133. t readOnlyTransaction
  134. meta *countsMap
  135. }
  136. func (s *FileSet) Snapshot() *Snapshot {
  137. t, err := s.db.newReadOnlyTransaction()
  138. if err != nil {
  139. panic(err)
  140. }
  141. return &Snapshot{
  142. folder: s.folder,
  143. t: t,
  144. meta: s.meta.Snapshot(),
  145. }
  146. }
  147. func (s *Snapshot) Release() {
  148. s.t.close()
  149. }
  150. func (s *Snapshot) WithNeed(device protocol.DeviceID, fn Iterator) {
  151. l.Debugf("%s WithNeed(%v)", s.folder, device)
  152. if err := s.t.withNeed([]byte(s.folder), device[:], false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  153. panic(err)
  154. }
  155. }
  156. func (s *Snapshot) WithNeedTruncated(device protocol.DeviceID, fn Iterator) {
  157. l.Debugf("%s WithNeedTruncated(%v)", s.folder, device)
  158. if err := s.t.withNeed([]byte(s.folder), device[:], true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  159. panic(err)
  160. }
  161. }
  162. func (s *Snapshot) WithHave(device protocol.DeviceID, fn Iterator) {
  163. l.Debugf("%s WithHave(%v)", s.folder, device)
  164. if err := s.t.withHave([]byte(s.folder), device[:], nil, false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  165. panic(err)
  166. }
  167. }
  168. func (s *Snapshot) WithHaveTruncated(device protocol.DeviceID, fn Iterator) {
  169. l.Debugf("%s WithHaveTruncated(%v)", s.folder, device)
  170. if err := s.t.withHave([]byte(s.folder), device[:], nil, true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  171. panic(err)
  172. }
  173. }
  174. func (s *Snapshot) WithHaveSequence(startSeq int64, fn Iterator) {
  175. l.Debugf("%s WithHaveSequence(%v)", s.folder, startSeq)
  176. if err := s.t.withHaveSequence([]byte(s.folder), startSeq, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  177. panic(err)
  178. }
  179. }
  180. // Except for an item with a path equal to prefix, only children of prefix are iterated.
  181. // E.g. for prefix "dir", "dir/file" is iterated, but "dir.file" is not.
  182. func (s *Snapshot) WithPrefixedHaveTruncated(device protocol.DeviceID, prefix string, fn Iterator) {
  183. l.Debugf(`%s WithPrefixedHaveTruncated(%v, "%v")`, s.folder, device, prefix)
  184. if err := s.t.withHave([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  185. panic(err)
  186. }
  187. }
  188. func (s *Snapshot) WithGlobal(fn Iterator) {
  189. l.Debugf("%s WithGlobal()", s.folder)
  190. if err := s.t.withGlobal([]byte(s.folder), nil, false, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  191. panic(err)
  192. }
  193. }
  194. func (s *Snapshot) WithGlobalTruncated(fn Iterator) {
  195. l.Debugf("%s WithGlobalTruncated()", s.folder)
  196. if err := s.t.withGlobal([]byte(s.folder), nil, true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  197. panic(err)
  198. }
  199. }
  200. // Except for an item with a path equal to prefix, only children of prefix are iterated.
  201. // E.g. for prefix "dir", "dir/file" is iterated, but "dir.file" is not.
  202. func (s *Snapshot) WithPrefixedGlobalTruncated(prefix string, fn Iterator) {
  203. l.Debugf(`%s WithPrefixedGlobalTruncated("%v")`, s.folder, prefix)
  204. if err := s.t.withGlobal([]byte(s.folder), []byte(osutil.NormalizedFilename(prefix)), true, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  205. panic(err)
  206. }
  207. }
  208. func (s *Snapshot) Get(device protocol.DeviceID, file string) (protocol.FileInfo, bool) {
  209. f, ok, err := s.t.getFile([]byte(s.folder), device[:], []byte(osutil.NormalizedFilename(file)))
  210. if backend.IsClosed(err) {
  211. return protocol.FileInfo{}, false
  212. } else if err != nil {
  213. panic(err)
  214. }
  215. f.Name = osutil.NativeFilename(f.Name)
  216. return f, ok
  217. }
  218. func (s *Snapshot) GetGlobal(file string) (protocol.FileInfo, bool) {
  219. _, fi, ok, err := s.t.getGlobal(nil, []byte(s.folder), []byte(osutil.NormalizedFilename(file)), false)
  220. if backend.IsClosed(err) {
  221. return protocol.FileInfo{}, false
  222. } else if err != nil {
  223. panic(err)
  224. }
  225. if !ok {
  226. return protocol.FileInfo{}, false
  227. }
  228. f := fi.(protocol.FileInfo)
  229. f.Name = osutil.NativeFilename(f.Name)
  230. return f, true
  231. }
  232. func (s *Snapshot) GetGlobalTruncated(file string) (FileInfoTruncated, bool) {
  233. _, fi, ok, err := s.t.getGlobal(nil, []byte(s.folder), []byte(osutil.NormalizedFilename(file)), true)
  234. if backend.IsClosed(err) {
  235. return FileInfoTruncated{}, false
  236. } else if err != nil {
  237. panic(err)
  238. }
  239. if !ok {
  240. return FileInfoTruncated{}, false
  241. }
  242. f := fi.(FileInfoTruncated)
  243. f.Name = osutil.NativeFilename(f.Name)
  244. return f, true
  245. }
  246. func (s *Snapshot) Availability(file string) []protocol.DeviceID {
  247. av, err := s.t.availability([]byte(s.folder), []byte(osutil.NormalizedFilename(file)))
  248. if backend.IsClosed(err) {
  249. return nil
  250. } else if err != nil {
  251. panic(err)
  252. }
  253. return av
  254. }
  255. func (s *Snapshot) Sequence(device protocol.DeviceID) int64 {
  256. return s.meta.Counts(device, 0).Sequence
  257. }
  258. // RemoteSequence returns the change version for the given folder, as
  259. // sent by remote peers. This is guaranteed to increment if the contents of
  260. // the remote or global folder has changed.
  261. func (s *Snapshot) RemoteSequence() int64 {
  262. var ver int64
  263. for _, device := range s.meta.devices() {
  264. ver += s.Sequence(device)
  265. }
  266. return ver
  267. }
  268. func (s *Snapshot) LocalSize() Counts {
  269. local := s.meta.Counts(protocol.LocalDeviceID, 0)
  270. return local.Add(s.ReceiveOnlyChangedSize())
  271. }
  272. func (s *Snapshot) ReceiveOnlyChangedSize() Counts {
  273. return s.meta.Counts(protocol.LocalDeviceID, protocol.FlagLocalReceiveOnly)
  274. }
  275. func (s *Snapshot) GlobalSize() Counts {
  276. global := s.meta.Counts(protocol.GlobalDeviceID, 0)
  277. recvOnlyChanged := s.meta.Counts(protocol.GlobalDeviceID, protocol.FlagLocalReceiveOnly)
  278. return global.Add(recvOnlyChanged)
  279. }
  280. func (s *Snapshot) NeedSize(device protocol.DeviceID) Counts {
  281. return s.meta.Counts(device, needFlag)
  282. }
  283. // LocalChangedFiles returns a paginated list of files that were changed locally.
  284. func (s *Snapshot) LocalChangedFiles(page, perpage int) []FileInfoTruncated {
  285. if s.ReceiveOnlyChangedSize().TotalItems() == 0 {
  286. return nil
  287. }
  288. files := make([]FileInfoTruncated, 0, perpage)
  289. skip := (page - 1) * perpage
  290. get := perpage
  291. s.WithHaveTruncated(protocol.LocalDeviceID, func(f FileIntf) bool {
  292. if !f.IsReceiveOnlyChanged() {
  293. return true
  294. }
  295. if skip > 0 {
  296. skip--
  297. return true
  298. }
  299. ft := f.(FileInfoTruncated)
  300. files = append(files, ft)
  301. get--
  302. return get > 0
  303. })
  304. return files
  305. }
  306. // RemoteNeedFolderFiles returns paginated list of currently needed files in
  307. // progress, queued, and to be queued on next puller iteration, as well as the
  308. // total number of files currently needed.
  309. func (s *Snapshot) RemoteNeedFolderFiles(device protocol.DeviceID, page, perpage int) []FileInfoTruncated {
  310. files := make([]FileInfoTruncated, 0, perpage)
  311. skip := (page - 1) * perpage
  312. get := perpage
  313. s.WithNeedTruncated(device, func(f FileIntf) bool {
  314. if skip > 0 {
  315. skip--
  316. return true
  317. }
  318. files = append(files, f.(FileInfoTruncated))
  319. get--
  320. return get > 0
  321. })
  322. return files
  323. }
  324. func (s *Snapshot) WithBlocksHash(hash []byte, fn Iterator) {
  325. l.Debugf(`%s WithBlocksHash("%x")`, s.folder, hash)
  326. if err := s.t.withBlocksHash([]byte(s.folder), hash, nativeFileIterator(fn)); err != nil && !backend.IsClosed(err) {
  327. panic(err)
  328. }
  329. }
  330. func (s *FileSet) Sequence(device protocol.DeviceID) int64 {
  331. return s.meta.Sequence(device)
  332. }
  333. func (s *FileSet) IndexID(device protocol.DeviceID) protocol.IndexID {
  334. id, err := s.db.getIndexID(device[:], []byte(s.folder))
  335. if backend.IsClosed(err) {
  336. return 0
  337. } else if err != nil {
  338. panic(err)
  339. }
  340. if id == 0 && device == protocol.LocalDeviceID {
  341. // No index ID set yet. We create one now.
  342. id = protocol.NewIndexID()
  343. err := s.db.setIndexID(device[:], []byte(s.folder), id)
  344. if backend.IsClosed(err) {
  345. return 0
  346. } else if err != nil {
  347. panic(err)
  348. }
  349. }
  350. return id
  351. }
  352. func (s *FileSet) SetIndexID(device protocol.DeviceID, id protocol.IndexID) {
  353. if device == protocol.LocalDeviceID {
  354. panic("do not explicitly set index ID for local device")
  355. }
  356. if err := s.db.setIndexID(device[:], []byte(s.folder), id); err != nil && !backend.IsClosed(err) {
  357. panic(err)
  358. }
  359. }
  360. func (s *FileSet) MtimeFS() *fs.MtimeFS {
  361. prefix, err := s.db.keyer.GenerateMtimesKey(nil, []byte(s.folder))
  362. if backend.IsClosed(err) {
  363. return nil
  364. } else if err != nil {
  365. panic(err)
  366. }
  367. kv := NewNamespacedKV(s.db, string(prefix))
  368. return fs.NewMtimeFS(s.fs, kv)
  369. }
  370. func (s *FileSet) ListDevices() []protocol.DeviceID {
  371. return s.meta.devices()
  372. }
  373. func (s *FileSet) RepairSequence() (int, error) {
  374. s.updateAndGCMutexLock() // Ensures consistent locking order
  375. defer s.updateMutex.Unlock()
  376. defer s.db.gcMut.RUnlock()
  377. return s.db.repairSequenceGCLocked(s.folder, s.meta)
  378. }
  379. func (s *FileSet) updateAndGCMutexLock() {
  380. s.updateMutex.Lock()
  381. s.db.gcMut.RLock()
  382. }
  383. // DropFolder clears out all information related to the given folder from the
  384. // database.
  385. func DropFolder(db *Lowlevel, folder string) {
  386. droppers := []func([]byte) error{
  387. db.dropFolder,
  388. db.dropMtimes,
  389. db.dropFolderMeta,
  390. db.folderIdx.Delete,
  391. }
  392. for _, drop := range droppers {
  393. if err := drop([]byte(folder)); backend.IsClosed(err) {
  394. return
  395. } else if err != nil {
  396. panic(err)
  397. }
  398. }
  399. }
  400. // DropDeltaIndexIDs removes all delta index IDs from the database.
  401. // This will cause a full index transmission on the next connection.
  402. func DropDeltaIndexIDs(db *Lowlevel) {
  403. dbi, err := db.NewPrefixIterator([]byte{KeyTypeIndexID})
  404. if backend.IsClosed(err) {
  405. return
  406. } else if err != nil {
  407. panic(err)
  408. }
  409. defer dbi.Release()
  410. for dbi.Next() {
  411. if err := db.Delete(dbi.Key()); err != nil && !backend.IsClosed(err) {
  412. panic(err)
  413. }
  414. }
  415. if err := dbi.Error(); err != nil && !backend.IsClosed(err) {
  416. panic(err)
  417. }
  418. }
  419. func normalizeFilenamesAndDropDuplicates(fs []protocol.FileInfo) []protocol.FileInfo {
  420. positions := make(map[string]int, len(fs))
  421. for i, f := range fs {
  422. norm := osutil.NormalizedFilename(f.Name)
  423. if pos, ok := positions[norm]; ok {
  424. fs[pos] = protocol.FileInfo{}
  425. }
  426. positions[norm] = i
  427. fs[i].Name = norm
  428. }
  429. for i := 0; i < len(fs); {
  430. if fs[i].Name == "" {
  431. fs = append(fs[:i], fs[i+1:]...)
  432. continue
  433. }
  434. i++
  435. }
  436. return fs
  437. }
  438. func nativeFileIterator(fn Iterator) Iterator {
  439. return func(fi FileIntf) bool {
  440. switch f := fi.(type) {
  441. case protocol.FileInfo:
  442. f.Name = osutil.NativeFilename(f.Name)
  443. return fn(f)
  444. case FileInfoTruncated:
  445. f.Name = osutil.NativeFilename(f.Name)
  446. return fn(f)
  447. default:
  448. panic("unknown interface type")
  449. }
  450. }
  451. }