leveldb_transactions.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package db
  7. import (
  8. "bytes"
  9. "github.com/syncthing/syncthing/lib/protocol"
  10. "github.com/syndtr/goleveldb/leveldb"
  11. )
  12. // A readOnlyTransaction represents a database snapshot.
  13. type readOnlyTransaction struct {
  14. *leveldb.Snapshot
  15. db *Instance
  16. }
  17. func (db *Instance) newReadOnlyTransaction() readOnlyTransaction {
  18. snap, err := db.GetSnapshot()
  19. if err != nil {
  20. panic(err)
  21. }
  22. return readOnlyTransaction{
  23. Snapshot: snap,
  24. db: db,
  25. }
  26. }
  27. func (t readOnlyTransaction) close() {
  28. t.Release()
  29. }
  30. func (t readOnlyTransaction) getFile(folder, device, file []byte) (protocol.FileInfo, bool) {
  31. return getFile(t, t.db.deviceKey(folder, device, file))
  32. }
  33. // A readWriteTransaction is a readOnlyTransaction plus a batch for writes.
  34. // The batch will be committed on close() or by checkFlush() if it exceeds the
  35. // batch size.
  36. type readWriteTransaction struct {
  37. readOnlyTransaction
  38. *leveldb.Batch
  39. }
  40. func (db *Instance) newReadWriteTransaction() readWriteTransaction {
  41. t := db.newReadOnlyTransaction()
  42. return readWriteTransaction{
  43. readOnlyTransaction: t,
  44. Batch: new(leveldb.Batch),
  45. }
  46. }
  47. func (t readWriteTransaction) close() {
  48. if err := t.db.Write(t.Batch, nil); err != nil {
  49. panic(err)
  50. }
  51. t.readOnlyTransaction.close()
  52. }
  53. func (t readWriteTransaction) checkFlush() {
  54. if t.Batch.Len() > batchFlushSize {
  55. if err := t.db.Write(t.Batch, nil); err != nil {
  56. panic(err)
  57. }
  58. t.Batch.Reset()
  59. }
  60. }
  61. func (t readWriteTransaction) insertFile(folder, device []byte, file protocol.FileInfo) int64 {
  62. l.Debugf("insert; folder=%q device=%v %v", folder, protocol.DeviceIDFromBytes(device), file)
  63. if file.LocalVersion == 0 {
  64. file.LocalVersion = clock(0)
  65. }
  66. name := []byte(file.Name)
  67. nk := t.db.deviceKey(folder, device, name)
  68. t.Put(nk, file.MustMarshalXDR())
  69. return file.LocalVersion
  70. }
  71. // updateGlobal adds this device+version to the version list for the given
  72. // file. If the device is already present in the list, the version is updated.
  73. // If the file does not have an entry in the global list, it is created.
  74. func (t readWriteTransaction) updateGlobal(folder, device []byte, file protocol.FileInfo, globalSize *sizeTracker) bool {
  75. l.Debugf("update global; folder=%q device=%v file=%q version=%d", folder, protocol.DeviceIDFromBytes(device), file.Name, file.Version)
  76. name := []byte(file.Name)
  77. gk := t.db.globalKey(folder, name)
  78. svl, err := t.Get(gk, nil)
  79. if err != nil && err != leveldb.ErrNotFound {
  80. panic(err)
  81. }
  82. var fl versionList
  83. var oldFile protocol.FileInfo
  84. var hasOldFile bool
  85. // Remove the device from the current version list
  86. if svl != nil {
  87. err = fl.UnmarshalXDR(svl)
  88. if err != nil {
  89. panic(err)
  90. }
  91. for i := range fl.versions {
  92. if bytes.Compare(fl.versions[i].device, device) == 0 {
  93. if fl.versions[i].version.Equal(file.Version) {
  94. // No need to do anything
  95. return false
  96. }
  97. if i == 0 {
  98. // Keep the current newest file around so we can subtract it from
  99. // the globalSize if we replace it.
  100. oldFile, hasOldFile = t.getFile(folder, fl.versions[0].device, name)
  101. }
  102. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  103. break
  104. }
  105. }
  106. }
  107. nv := fileVersion{
  108. device: device,
  109. version: file.Version,
  110. }
  111. insertedAt := -1
  112. // Find a position in the list to insert this file. The file at the front
  113. // of the list is the newer, the "global".
  114. for i := range fl.versions {
  115. switch fl.versions[i].version.Compare(file.Version) {
  116. case protocol.Equal, protocol.Lesser:
  117. // The version at this point in the list is equal to or lesser
  118. // ("older") than us. We insert ourselves in front of it.
  119. fl.versions = insertVersion(fl.versions, i, nv)
  120. insertedAt = i
  121. goto done
  122. case protocol.ConcurrentLesser, protocol.ConcurrentGreater:
  123. // The version at this point is in conflict with us. We must pull
  124. // the actual file metadata to determine who wins. If we win, we
  125. // insert ourselves in front of the loser here. (The "Lesser" and
  126. // "Greater" in the condition above is just based on the device
  127. // IDs in the version vector, which is not the only thing we use
  128. // to determine the winner.)
  129. of, ok := t.getFile(folder, fl.versions[i].device, name)
  130. if !ok {
  131. panic("file referenced in version list does not exist")
  132. }
  133. if file.WinsConflict(of) {
  134. fl.versions = insertVersion(fl.versions, i, nv)
  135. insertedAt = i
  136. goto done
  137. }
  138. }
  139. }
  140. // We didn't find a position for an insert above, so append to the end.
  141. fl.versions = append(fl.versions, nv)
  142. insertedAt = len(fl.versions) - 1
  143. done:
  144. if insertedAt == 0 {
  145. // We just inserted a new newest version. Fixup the global size
  146. // calculation.
  147. if !file.Version.Equal(oldFile.Version) {
  148. globalSize.addFile(file)
  149. if hasOldFile {
  150. // We have the old file that was removed at the head of the list.
  151. globalSize.removeFile(oldFile)
  152. } else if len(fl.versions) > 1 {
  153. // The previous newest version is now at index 1, grab it from there.
  154. oldFile, ok := t.getFile(folder, fl.versions[1].device, name)
  155. if !ok {
  156. panic("file referenced in version list does not exist")
  157. }
  158. globalSize.removeFile(oldFile)
  159. }
  160. }
  161. }
  162. l.Debugf("new global after update: %v", fl)
  163. t.Put(gk, fl.MustMarshalXDR())
  164. return true
  165. }
  166. // removeFromGlobal removes the device from the global version list for the
  167. // given file. If the version list is empty after this, the file entry is
  168. // removed entirely.
  169. func (t readWriteTransaction) removeFromGlobal(folder, device, file []byte, globalSize *sizeTracker) {
  170. l.Debugf("remove from global; folder=%q device=%v file=%q", folder, protocol.DeviceIDFromBytes(device), file)
  171. gk := t.db.globalKey(folder, file)
  172. svl, err := t.Get(gk, nil)
  173. if err != nil {
  174. // We might be called to "remove" a global version that doesn't exist
  175. // if the first update for the file is already marked invalid.
  176. return
  177. }
  178. var fl versionList
  179. err = fl.UnmarshalXDR(svl)
  180. if err != nil {
  181. panic(err)
  182. }
  183. removed := false
  184. for i := range fl.versions {
  185. if bytes.Compare(fl.versions[i].device, device) == 0 {
  186. if i == 0 && globalSize != nil {
  187. f, ok := t.getFile(folder, device, file)
  188. if !ok {
  189. panic("removing nonexistent file")
  190. }
  191. globalSize.removeFile(f)
  192. removed = true
  193. }
  194. fl.versions = append(fl.versions[:i], fl.versions[i+1:]...)
  195. break
  196. }
  197. }
  198. if len(fl.versions) == 0 {
  199. t.Delete(gk)
  200. } else {
  201. l.Debugf("new global after remove: %v", fl)
  202. t.Put(gk, fl.MustMarshalXDR())
  203. if removed {
  204. f, ok := t.getFile(folder, fl.versions[0].device, file)
  205. if !ok {
  206. panic("new global is nonexistent file")
  207. }
  208. globalSize.addFile(f)
  209. }
  210. }
  211. }
  212. func insertVersion(vl []fileVersion, i int, v fileVersion) []fileVersion {
  213. t := append(vl, fileVersion{})
  214. copy(t[i+1:], t[i:])
  215. t[i] = v
  216. return t
  217. }