instance.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "encoding/binary"
  10. "fmt"
  11. "github.com/syncthing/syncthing/lib/protocol"
  12. "github.com/syndtr/goleveldb/leveldb"
  13. "github.com/syndtr/goleveldb/leveldb/util"
  14. )
  15. type instance struct {
  16. *Lowlevel
  17. keyer keyer
  18. }
  19. func newInstance(ll *Lowlevel) *instance {
  20. return &instance{
  21. Lowlevel: ll,
  22. keyer: newDefaultKeyer(ll.folderIdx, ll.deviceIdx),
  23. }
  24. }
  25. // updateRemoteFiles adds a list of fileinfos to the database and updates the
  26. // global versionlist and metadata.
  27. func (db *instance) updateRemoteFiles(folder, device []byte, fs []protocol.FileInfo, meta *metadataTracker) {
  28. t := db.newReadWriteTransaction()
  29. defer t.close()
  30. var dk, gk, keyBuf []byte
  31. devID := protocol.DeviceIDFromBytes(device)
  32. for _, f := range fs {
  33. name := []byte(f.Name)
  34. dk = db.keyer.GenerateDeviceFileKey(dk, folder, device, name)
  35. ef, ok := t.getFileTrunc(dk, true)
  36. if ok && unchanged(f, ef) {
  37. continue
  38. }
  39. if ok {
  40. meta.removeFile(devID, ef)
  41. }
  42. meta.addFile(devID, f)
  43. l.Debugf("insert; folder=%q device=%v %v", folder, devID, f)
  44. t.Put(dk, mustMarshal(&f))
  45. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  46. keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, device, f, meta)
  47. t.checkFlush()
  48. }
  49. }
  50. // updateLocalFiles adds fileinfos to the db, and updates the global versionlist,
  51. // metadata, sequence and blockmap buckets.
  52. func (db *instance) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta *metadataTracker) {
  53. t := db.newReadWriteTransaction()
  54. defer t.close()
  55. var dk, gk, keyBuf []byte
  56. blockBuf := make([]byte, 4)
  57. for _, f := range fs {
  58. name := []byte(f.Name)
  59. dk = db.keyer.GenerateDeviceFileKey(dk, folder, protocol.LocalDeviceID[:], name)
  60. ef, ok := t.getFileByKey(dk)
  61. if ok && unchanged(f, ef) {
  62. continue
  63. }
  64. if ok {
  65. if !ef.IsDirectory() && !ef.IsDeleted() && !ef.IsInvalid() {
  66. for _, block := range ef.Blocks {
  67. keyBuf = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  68. t.Delete(keyBuf)
  69. }
  70. }
  71. keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, ef.SequenceNo())
  72. t.Delete(keyBuf)
  73. l.Debugf("removing sequence; folder=%q sequence=%v %v", folder, ef.SequenceNo(), ef.FileName())
  74. }
  75. f.Sequence = meta.nextLocalSeq()
  76. if ok {
  77. meta.removeFile(protocol.LocalDeviceID, ef)
  78. }
  79. meta.addFile(protocol.LocalDeviceID, f)
  80. l.Debugf("insert (local); folder=%q %v", folder, f)
  81. t.Put(dk, mustMarshal(&f))
  82. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, []byte(f.Name))
  83. keyBuf, _ = t.updateGlobal(gk, keyBuf, folder, protocol.LocalDeviceID[:], f, meta)
  84. keyBuf = db.keyer.GenerateSequenceKey(keyBuf, folder, f.Sequence)
  85. t.Put(keyBuf, dk)
  86. l.Debugf("adding sequence; folder=%q sequence=%v %v", folder, f.Sequence, f.Name)
  87. if !f.IsDirectory() && !f.IsDeleted() && !f.IsInvalid() {
  88. for i, block := range f.Blocks {
  89. binary.BigEndian.PutUint32(blockBuf, uint32(i))
  90. keyBuf = db.keyer.GenerateBlockMapKey(keyBuf, folder, block.Hash, name)
  91. t.Put(keyBuf, blockBuf)
  92. }
  93. }
  94. t.checkFlush()
  95. }
  96. }
  97. func (db *instance) withHave(folder, device, prefix []byte, truncate bool, fn Iterator) {
  98. t := db.newReadOnlyTransaction()
  99. defer t.close()
  100. if len(prefix) > 0 {
  101. unslashedPrefix := prefix
  102. if bytes.HasSuffix(prefix, []byte{'/'}) {
  103. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  104. } else {
  105. prefix = append(prefix, '/')
  106. }
  107. if f, ok := t.getFileTrunc(db.keyer.GenerateDeviceFileKey(nil, folder, device, unslashedPrefix), true); ok && !fn(f) {
  108. return
  109. }
  110. }
  111. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, device, prefix)), nil)
  112. defer dbi.Release()
  113. for dbi.Next() {
  114. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  115. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  116. return
  117. }
  118. f, err := unmarshalTrunc(dbi.Value(), truncate)
  119. if err != nil {
  120. l.Debugln("unmarshal error:", err)
  121. continue
  122. }
  123. if !fn(f) {
  124. return
  125. }
  126. }
  127. }
  128. func (db *instance) withHaveSequence(folder []byte, startSeq int64, fn Iterator) {
  129. t := db.newReadOnlyTransaction()
  130. defer t.close()
  131. dbi := t.NewIterator(&util.Range{Start: db.keyer.GenerateSequenceKey(nil, folder, startSeq), Limit: db.keyer.GenerateSequenceKey(nil, folder, maxInt64)}, nil)
  132. defer dbi.Release()
  133. for dbi.Next() {
  134. f, ok := t.getFileByKey(dbi.Value())
  135. if !ok {
  136. l.Debugln("missing file for sequence number", db.keyer.SequenceFromSequenceKey(dbi.Key()))
  137. continue
  138. }
  139. if shouldDebug() {
  140. if seq := db.keyer.SequenceFromSequenceKey(dbi.Key()); f.Sequence != seq {
  141. l.Warnf("Sequence index corruption (folder %v, file %v): sequence %d != expected %d", string(folder), f.Name, f.Sequence, seq)
  142. panic("sequence index corruption")
  143. }
  144. }
  145. if !fn(f) {
  146. return
  147. }
  148. }
  149. }
  150. func (db *instance) withAllFolderTruncated(folder []byte, fn func(device []byte, f FileInfoTruncated) bool) {
  151. t := db.newReadWriteTransaction()
  152. defer t.close()
  153. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil).WithoutNameAndDevice()), nil)
  154. defer dbi.Release()
  155. var gk, keyBuf []byte
  156. for dbi.Next() {
  157. device, ok := db.keyer.DeviceFromDeviceFileKey(dbi.Key())
  158. if !ok {
  159. // Not having the device in the index is bad. Clear it.
  160. t.Delete(dbi.Key())
  161. t.checkFlush()
  162. continue
  163. }
  164. var f FileInfoTruncated
  165. // The iterator function may keep a reference to the unmarshalled
  166. // struct, which in turn references the buffer it was unmarshalled
  167. // from. dbi.Value() just returns an internal slice that it reuses, so
  168. // we need to copy it.
  169. err := f.Unmarshal(append([]byte{}, dbi.Value()...))
  170. if err != nil {
  171. l.Debugln("unmarshal error:", err)
  172. continue
  173. }
  174. switch f.Name {
  175. case "", ".", "..", "/": // A few obviously invalid filenames
  176. l.Infof("Dropping invalid filename %q from database", f.Name)
  177. name := []byte(f.Name)
  178. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  179. keyBuf = t.removeFromGlobal(gk, keyBuf, folder, device, name, nil)
  180. t.Delete(dbi.Key())
  181. t.checkFlush()
  182. continue
  183. }
  184. if !fn(device, f) {
  185. return
  186. }
  187. }
  188. }
  189. func (db *instance) getFileDirty(folder, device, file []byte) (protocol.FileInfo, bool) {
  190. t := db.newReadOnlyTransaction()
  191. defer t.close()
  192. return t.getFile(folder, device, file)
  193. }
  194. func (db *instance) getGlobalDirty(folder, file []byte, truncate bool) (FileIntf, bool) {
  195. t := db.newReadOnlyTransaction()
  196. defer t.close()
  197. _, f, ok := t.getGlobal(nil, folder, file, truncate)
  198. return f, ok
  199. }
  200. func (db *instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator) {
  201. t := db.newReadOnlyTransaction()
  202. defer t.close()
  203. if len(prefix) > 0 {
  204. unslashedPrefix := prefix
  205. if bytes.HasSuffix(prefix, []byte{'/'}) {
  206. unslashedPrefix = unslashedPrefix[:len(unslashedPrefix)-1]
  207. } else {
  208. prefix = append(prefix, '/')
  209. }
  210. if _, f, ok := t.getGlobal(nil, folder, unslashedPrefix, truncate); ok && !fn(f) {
  211. return
  212. }
  213. }
  214. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, prefix)), nil)
  215. defer dbi.Release()
  216. var dk []byte
  217. for dbi.Next() {
  218. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  219. if len(prefix) > 0 && !bytes.HasPrefix(name, prefix) {
  220. return
  221. }
  222. vl, ok := unmarshalVersionList(dbi.Value())
  223. if !ok {
  224. continue
  225. }
  226. dk = db.keyer.GenerateDeviceFileKey(dk, folder, vl.Versions[0].Device, name)
  227. f, ok := t.getFileTrunc(dk, truncate)
  228. if !ok {
  229. continue
  230. }
  231. if !fn(f) {
  232. return
  233. }
  234. }
  235. }
  236. func (db *instance) availability(folder, file []byte) []protocol.DeviceID {
  237. k := db.keyer.GenerateGlobalVersionKey(nil, folder, file)
  238. bs, err := db.Get(k, nil)
  239. if err == leveldb.ErrNotFound {
  240. return nil
  241. }
  242. if err != nil {
  243. l.Debugln("surprise error:", err)
  244. return nil
  245. }
  246. vl, ok := unmarshalVersionList(bs)
  247. if !ok {
  248. return nil
  249. }
  250. var devices []protocol.DeviceID
  251. for _, v := range vl.Versions {
  252. if !v.Version.Equal(vl.Versions[0].Version) {
  253. break
  254. }
  255. if v.Invalid {
  256. continue
  257. }
  258. n := protocol.DeviceIDFromBytes(v.Device)
  259. devices = append(devices, n)
  260. }
  261. return devices
  262. }
  263. func (db *instance) withNeed(folder, device []byte, truncate bool, fn Iterator) {
  264. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  265. db.withNeedLocal(folder, truncate, fn)
  266. return
  267. }
  268. t := db.newReadOnlyTransaction()
  269. defer t.close()
  270. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName()), nil)
  271. defer dbi.Release()
  272. var dk []byte
  273. devID := protocol.DeviceIDFromBytes(device)
  274. for dbi.Next() {
  275. vl, ok := unmarshalVersionList(dbi.Value())
  276. if !ok {
  277. continue
  278. }
  279. haveFV, have := vl.Get(device)
  280. // XXX: This marks Concurrent (i.e. conflicting) changes as
  281. // needs. Maybe we should do that, but it needs special
  282. // handling in the puller.
  283. if have && haveFV.Version.GreaterEqual(vl.Versions[0].Version) {
  284. continue
  285. }
  286. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  287. needVersion := vl.Versions[0].Version
  288. needDevice := protocol.DeviceIDFromBytes(vl.Versions[0].Device)
  289. for i := range vl.Versions {
  290. if !vl.Versions[i].Version.Equal(needVersion) {
  291. // We haven't found a valid copy of the file with the needed version.
  292. break
  293. }
  294. if vl.Versions[i].Invalid {
  295. // The file is marked invalid, don't use it.
  296. continue
  297. }
  298. dk = db.keyer.GenerateDeviceFileKey(dk, folder, vl.Versions[i].Device, name)
  299. gf, ok := t.getFileTrunc(dk, truncate)
  300. if !ok {
  301. continue
  302. }
  303. if gf.IsDeleted() && !have {
  304. // We don't need deleted files that we don't have
  305. break
  306. }
  307. l.Debugf("need folder=%q device=%v name=%q have=%v invalid=%v haveV=%v globalV=%v globalDev=%v", folder, devID, name, have, haveFV.Invalid, haveFV.Version, needVersion, needDevice)
  308. if !fn(gf) {
  309. return
  310. }
  311. // This file is handled, no need to look further in the version list
  312. break
  313. }
  314. }
  315. }
  316. func (db *instance) withNeedLocal(folder []byte, truncate bool, fn Iterator) {
  317. t := db.newReadOnlyTransaction()
  318. defer t.close()
  319. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateNeedFileKey(nil, folder, nil).WithoutName()), nil)
  320. defer dbi.Release()
  321. var keyBuf []byte
  322. var f FileIntf
  323. var ok bool
  324. for dbi.Next() {
  325. keyBuf, f, ok = t.getGlobal(keyBuf, folder, db.keyer.NameFromGlobalVersionKey(dbi.Key()), truncate)
  326. if !ok {
  327. continue
  328. }
  329. if !fn(f) {
  330. return
  331. }
  332. }
  333. }
  334. func (db *instance) dropFolder(folder []byte) {
  335. t := db.newReadWriteTransaction()
  336. defer t.close()
  337. for _, key := range [][]byte{
  338. // Remove all items related to the given folder from the device->file bucket
  339. db.keyer.GenerateDeviceFileKey(nil, folder, nil, nil).WithoutNameAndDevice(),
  340. // Remove all sequences related to the folder
  341. db.keyer.GenerateSequenceKey(nil, []byte(folder), 0).WithoutSequence(),
  342. // Remove all items related to the given folder from the global bucket
  343. db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName(),
  344. // Remove all needs related to the folder
  345. db.keyer.GenerateNeedFileKey(nil, folder, nil).WithoutName(),
  346. // Remove the blockmap of the folder
  347. db.keyer.GenerateBlockMapKey(nil, folder, nil, nil).WithoutHashAndName(),
  348. } {
  349. t.deleteKeyPrefix(key)
  350. }
  351. }
  352. func (db *instance) dropDeviceFolder(device, folder []byte, meta *metadataTracker) {
  353. t := db.newReadWriteTransaction()
  354. defer t.close()
  355. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateDeviceFileKey(nil, folder, device, nil)), nil)
  356. defer dbi.Release()
  357. var gk, keyBuf []byte
  358. for dbi.Next() {
  359. name := db.keyer.NameFromDeviceFileKey(dbi.Key())
  360. gk = db.keyer.GenerateGlobalVersionKey(gk, folder, name)
  361. keyBuf = t.removeFromGlobal(gk, keyBuf, folder, device, name, meta)
  362. t.Delete(dbi.Key())
  363. t.checkFlush()
  364. }
  365. if bytes.Equal(device, protocol.LocalDeviceID[:]) {
  366. t.deleteKeyPrefix(db.keyer.GenerateBlockMapKey(nil, folder, nil, nil).WithoutHashAndName())
  367. }
  368. }
  369. func (db *instance) checkGlobals(folder []byte, meta *metadataTracker) {
  370. t := db.newReadWriteTransaction()
  371. defer t.close()
  372. dbi := t.NewIterator(util.BytesPrefix(db.keyer.GenerateGlobalVersionKey(nil, folder, nil).WithoutName()), nil)
  373. defer dbi.Release()
  374. var dk []byte
  375. for dbi.Next() {
  376. vl, ok := unmarshalVersionList(dbi.Value())
  377. if !ok {
  378. continue
  379. }
  380. // Check the global version list for consistency. An issue in previous
  381. // versions of goleveldb could result in reordered writes so that
  382. // there are global entries pointing to no longer existing files. Here
  383. // we find those and clear them out.
  384. name := db.keyer.NameFromGlobalVersionKey(dbi.Key())
  385. var newVL VersionList
  386. for i, version := range vl.Versions {
  387. dk = db.keyer.GenerateDeviceFileKey(dk, folder, version.Device, name)
  388. _, err := t.Get(dk, nil)
  389. if err == leveldb.ErrNotFound {
  390. continue
  391. }
  392. if err != nil {
  393. l.Debugln("surprise error:", err)
  394. return
  395. }
  396. newVL.Versions = append(newVL.Versions, version)
  397. if i == 0 {
  398. if fi, ok := t.getFileByKey(dk); ok {
  399. meta.addFile(protocol.GlobalDeviceID, fi)
  400. }
  401. }
  402. }
  403. if len(newVL.Versions) != len(vl.Versions) {
  404. t.Put(dbi.Key(), mustMarshal(&newVL))
  405. t.checkFlush()
  406. }
  407. }
  408. l.Debugf("db check completed for %q", folder)
  409. }
  410. func (db *instance) getIndexID(device, folder []byte) protocol.IndexID {
  411. cur, err := db.Get(db.keyer.GenerateIndexIDKey(nil, device, folder), nil)
  412. if err != nil {
  413. return 0
  414. }
  415. var id protocol.IndexID
  416. if err := id.Unmarshal(cur); err != nil {
  417. return 0
  418. }
  419. return id
  420. }
  421. func (db *instance) setIndexID(device, folder []byte, id protocol.IndexID) {
  422. bs, _ := id.Marshal() // marshalling can't fail
  423. if err := db.Put(db.keyer.GenerateIndexIDKey(nil, device, folder), bs, nil); err != nil && err != leveldb.ErrClosed {
  424. panic("storing index ID: " + err.Error())
  425. }
  426. }
  427. func (db *instance) dropMtimes(folder []byte) {
  428. db.dropPrefix(db.keyer.GenerateMtimesKey(nil, folder))
  429. }
  430. func (db *instance) dropFolderMeta(folder []byte) {
  431. db.dropPrefix(db.keyer.GenerateFolderMetaKey(nil, folder))
  432. }
  433. func (db *instance) dropPrefix(prefix []byte) {
  434. t := db.newReadWriteTransaction()
  435. defer t.close()
  436. t.deleteKeyPrefix(prefix)
  437. }
  438. func unmarshalTrunc(bs []byte, truncate bool) (FileIntf, error) {
  439. if truncate {
  440. var tf FileInfoTruncated
  441. err := tf.Unmarshal(bs)
  442. return tf, err
  443. }
  444. var tf protocol.FileInfo
  445. err := tf.Unmarshal(bs)
  446. return tf, err
  447. }
  448. func unmarshalVersionList(data []byte) (VersionList, bool) {
  449. var vl VersionList
  450. if err := vl.Unmarshal(data); err != nil {
  451. l.Debugln("unmarshal error:", err)
  452. return VersionList{}, false
  453. }
  454. if len(vl.Versions) == 0 {
  455. l.Debugln("empty version list")
  456. return VersionList{}, false
  457. }
  458. return vl, true
  459. }
  460. type errorSuggestion struct {
  461. inner error
  462. suggestion string
  463. }
  464. func (e errorSuggestion) Error() string {
  465. return fmt.Sprintf("%s (%s)", e.inner.Error(), e.suggestion)
  466. }
  467. // unchanged checks if two files are the same and thus don't need to be updated.
  468. // Local flags or the invalid bit might change without the version
  469. // being bumped. The IsInvalid() method handles both.
  470. func unchanged(nf, ef FileIntf) bool {
  471. return ef.FileVersion().Equal(nf.FileVersion()) && ef.IsInvalid() == nf.IsInvalid()
  472. }