瀏覽代碼

lib/db: Keep metadata better in sync (ref #6335) (#6337)

This adds metadata updates to the same write batch as the underlying
file change. The odds of a metadata update going missing is greatly
reduced.

Bonus change: actually commit the transaction in recalcMeta.
Jakob Borg 5 年之前
父節點
當前提交
6a840a040b
共有 5 個文件被更改,包括 51 次插入17 次删除
  1. 6 1
      lib/db/backend/backend.go
  2. 8 3
      lib/db/backend/leveldb_backend.go
  3. 14 2
      lib/db/lowlevel.go
  4. 3 3
      lib/db/meta.go
  5. 20 8
      lib/db/set.go

+ 6 - 1
lib/db/backend/backend.go

@@ -48,10 +48,15 @@ type ReadTransaction interface {
 // purposes of saving memory when transactions are in-RAM. Note that
 // transactions may be checkpointed *anyway* even if this is not called, due to
 // resource constraints, but this gives you a chance to decide when.
+//
+// Functions can be passed to Checkpoint. These are run if and only if the
+// checkpoint will result in a flush, and will run before the flush. The
+// transaction can be accessed via a closure. If an error is returned from
+// these functions the flush will be aborted and the error bubbled.
 type WriteTransaction interface {
 	ReadTransaction
 	Writer
-	Checkpoint() error
+	Checkpoint(...func() error) error
 	Commit() error
 }
 

+ 8 - 3
lib/db/backend/leveldb_backend.go

@@ -150,8 +150,8 @@ func (t *leveldbTransaction) Put(key, val []byte) error {
 	return t.checkFlush(dbFlushBatchMax)
 }
 
-func (t *leveldbTransaction) Checkpoint() error {
-	return t.checkFlush(dbFlushBatchMin)
+func (t *leveldbTransaction) Checkpoint(preFlush ...func() error) error {
+	return t.checkFlush(dbFlushBatchMin, preFlush...)
 }
 
 func (t *leveldbTransaction) Commit() error {
@@ -167,10 +167,15 @@ func (t *leveldbTransaction) Release() {
 }
 
 // checkFlush flushes and resets the batch if its size exceeds the given size.
-func (t *leveldbTransaction) checkFlush(size int) error {
+func (t *leveldbTransaction) checkFlush(size int, preFlush ...func() error) error {
 	if len(t.batch.Dump()) < size {
 		return nil
 	}
+	for _, hook := range preFlush {
+		if err := hook(); err != nil {
+			return err
+		}
+	}
 	return t.flush()
 }
 

+ 14 - 2
lib/db/lowlevel.go

@@ -124,11 +124,17 @@ func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileI
 			return err
 		}
 
-		if err := t.Checkpoint(); err != nil {
+		if err := t.Checkpoint(func() error {
+			return meta.toDB(t, folder)
+		}); err != nil {
 			return err
 		}
 	}
 
+	if err := meta.toDB(t, folder); err != nil {
+		return err
+	}
+
 	return t.Commit()
 }
 
@@ -227,11 +233,17 @@ func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta
 			}
 		}
 
-		if err := t.Checkpoint(); err != nil {
+		if err := t.Checkpoint(func() error {
+			return meta.toDB(t, folder)
+		}); err != nil {
 			return err
 		}
 	}
 
+	if err := meta.toDB(t, folder); err != nil {
+		return err
+	}
+
 	return t.Commit()
 }
 

+ 3 - 3
lib/db/meta.go

@@ -62,8 +62,8 @@ func (m *metadataTracker) Marshal() ([]byte, error) {
 
 // toDB saves the marshalled metadataTracker to the given db, under the key
 // corresponding to the given folder
-func (m *metadataTracker) toDB(db *Lowlevel, folder []byte) error {
-	key, err := db.keyer.GenerateFolderMetaKey(nil, folder)
+func (m *metadataTracker) toDB(t readWriteTransaction, folder []byte) error {
+	key, err := t.keyer.GenerateFolderMetaKey(nil, folder)
 	if err != nil {
 		return err
 	}
@@ -79,7 +79,7 @@ func (m *metadataTracker) toDB(db *Lowlevel, folder []byte) error {
 	if err != nil {
 		return err
 	}
-	err = db.Put(key, bs)
+	err = t.Put(key, bs)
 	if err == nil {
 		m.dirty = false
 	}

+ 20 - 8
lib/db/set.go

@@ -117,6 +117,8 @@ func (s *FileSet) recalcMeta() error {
 	if err != nil {
 		return err
 	}
+	defer t.close()
+
 	var deviceID protocol.DeviceID
 	err = t.withAllFolderTruncated([]byte(s.folder), func(device []byte, f FileInfoTruncated) bool {
 		copy(deviceID[:], device)
@@ -128,7 +130,10 @@ func (s *FileSet) recalcMeta() error {
 	}
 
 	s.meta.SetCreated()
-	return s.meta.toDB(s.db, []byte(s.folder))
+	if err := s.meta.toDB(t, []byte(s.folder)); err != nil {
+		return err
+	}
+	return t.Commit()
 }
 
 // Verify the local sequence number from actual sequence entries. Returns
@@ -184,7 +189,20 @@ func (s *FileSet) Drop(device protocol.DeviceID) {
 		s.meta.resetAll(device)
 	}
 
-	if err := s.meta.toDB(s.db, []byte(s.folder)); backend.IsClosed(err) {
+	t, err := s.db.newReadWriteTransaction()
+	if backend.IsClosed(err) {
+		return
+	} else if err != nil {
+		panic(err)
+	}
+	defer t.close()
+
+	if err := s.meta.toDB(t, []byte(s.folder)); backend.IsClosed(err) {
+		return
+	} else if err != nil {
+		panic(err)
+	}
+	if err := t.Commit(); backend.IsClosed(err) {
 		return
 	} else if err != nil {
 		panic(err)
@@ -202,12 +220,6 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) {
 	s.updateMutex.Lock()
 	defer s.updateMutex.Unlock()
 
-	defer func() {
-		if err := s.meta.toDB(s.db, []byte(s.folder)); err != nil && !backend.IsClosed(err) {
-			panic(err)
-		}
-	}()
-
 	if device == protocol.LocalDeviceID {
 		// For the local device we have a bunch of metadata to track.
 		if err := s.db.updateLocalFiles([]byte(s.folder), fs, s.meta); err != nil && !backend.IsClosed(err) {