Przeglądaj źródła

chore(db): use shorter read transactions and periodic checkpoint for smaller WAL (#10027)

Also make sure our journal size limit is in effect when checkpointing.
Jakob Borg 6 miesięcy temu
rodzic
commit
82a0dd8eaa

+ 1 - 0
internal/db/sqlite/db.go

@@ -20,6 +20,7 @@ type DB struct {
 	sql            *sqlx.DB
 	sql            *sqlx.DB
 	localDeviceIdx int64
 	localDeviceIdx int64
 	updateLock     sync.Mutex
 	updateLock     sync.Mutex
+	updatePoints   int
 
 
 	statementsMut sync.RWMutex
 	statementsMut sync.RWMutex
 	statements    map[string]*sqlx.Stmt
 	statements    map[string]*sqlx.Stmt

+ 0 - 4
internal/db/sqlite/db_open.go

@@ -36,10 +36,6 @@ func Open(path string) (*DB, error) {
 		// https://www.sqlite.org/pragma.html#pragma_optimize
 		// https://www.sqlite.org/pragma.html#pragma_optimize
 		return nil, wrap(err, "PRAGMA optimize")
 		return nil, wrap(err, "PRAGMA optimize")
 	}
 	}
-	if _, err := sqlDB.Exec(`PRAGMA journal_size_limit = 67108864`); err != nil {
-		// https://www.powersync.com/blog/sqlite-optimizations-for-ultra-high-performance
-		return nil, wrap(err, "PRAGMA journal_size_limit")
-	}
 	return openCommon(sqlDB)
 	return openCommon(sqlDB)
 }
 }
 
 

+ 10 - 4
internal/db/sqlite/db_service.go

@@ -86,10 +86,16 @@ func (s *Service) periodic(ctx context.Context) error {
 		return wrap(err)
 		return wrap(err)
 	}
 	}
 
 
-	_, _ = s.sdb.sql.ExecContext(ctx, `ANALYZE`)
-	_, _ = s.sdb.sql.ExecContext(ctx, `PRAGMA optimize`)
-	_, _ = s.sdb.sql.ExecContext(ctx, `PRAGMA incremental_vacuum`)
-	_, _ = s.sdb.sql.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`)
+	conn, err := s.sdb.sql.Conn(ctx)
+	if err != nil {
+		return wrap(err)
+	}
+	defer conn.Close()
+	_, _ = conn.ExecContext(ctx, `ANALYZE`)
+	_, _ = conn.ExecContext(ctx, `PRAGMA optimize`)
+	_, _ = conn.ExecContext(ctx, `PRAGMA incremental_vacuum`)
+	_, _ = conn.ExecContext(ctx, `PRAGMA journal_size_limit = 67108864`)
+	_, _ = conn.ExecContext(ctx, `PRAGMA wal_checkpoint(TRUNCATE)`)
 
 
 	return nil
 	return nil
 }
 }

+ 40 - 1
internal/db/sqlite/db_update.go

@@ -23,6 +23,13 @@ import (
 	"google.golang.org/protobuf/proto"
 	"google.golang.org/protobuf/proto"
 )
 )
 
 
+const (
+	// Arbitrarily chosen values for checkpoint frequency....
+	updatePointsPerFile   = 100
+	updatePointsPerBlock  = 1
+	updatePointsThreshold = 250_000
+)
+
 func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileInfo) error {
 func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileInfo) error {
 	s.updateLock.Lock()
 	s.updateLock.Lock()
 	defer s.updateLock.Unlock()
 	defer s.updateLock.Unlock()
@@ -143,7 +150,12 @@ func (s *DB) Update(folder string, device protocol.DeviceID, fs []protocol.FileI
 		}
 		}
 	}
 	}
 
 
-	return wrap(tx.Commit())
+	if err := tx.Commit(); err != nil {
+		return wrap(err)
+	}
+
+	s.periodicCheckpointLocked(fs)
+	return nil
 }
 }
 
 
 func (s *DB) DropFolder(folder string) error {
 func (s *DB) DropFolder(folder string) error {
@@ -554,3 +566,30 @@ func (e fileRow) Compare(other fileRow) int {
 		return 0
 		return 0
 	}
 	}
 }
 }
+
+func (s *DB) periodicCheckpointLocked(fs []protocol.FileInfo) {
+	// Induce periodic checkpoints. We add points for each file and block,
+	// and checkpoint when we've written more than a threshold of points.
+	// This ensures we do not go too long without a checkpoint, while also
+	// not doing it incessantly for every update.
+	s.updatePoints += updatePointsPerFile * len(fs)
+	for _, f := range fs {
+		s.updatePoints += len(f.Blocks) * updatePointsPerBlock
+	}
+	if s.updatePoints > updatePointsThreshold {
+		l.Debugln("checkpoint at", s.updatePoints)
+		conn, err := s.sql.Conn(context.Background())
+		if err != nil {
+			l.Debugln("conn:", err)
+			return
+		}
+		defer conn.Close()
+		if _, err := conn.ExecContext(context.Background(), `PRAGMA journal_size_limit = 67108864`); err != nil {
+			l.Debugln("PRAGMA journal_size_limit(RESTART):", err)
+		}
+		if _, err := conn.ExecContext(context.Background(), `PRAGMA wal_checkpoint(RESTART)`); err != nil {
+			l.Debugln("PRAGMA wal_checkpoint(RESTART):", err)
+		}
+		s.updatePoints = 0
+	}
+}

+ 2 - 15
lib/model/indexhandler.go

@@ -295,7 +295,6 @@ func (s *indexHandler) sendIndexTo(ctx context.Context) error {
 	var f protocol.FileInfo
 	var f protocol.FileInfo
 	previousWasDelete := false
 	previousWasDelete := false
 
 
-	t0 := time.Now()
 	for fi, err := range itererr.Zip(s.sdb.AllLocalFilesBySequence(s.folder, protocol.LocalDeviceID, s.localPrevSequence+1, 5000)) {
 	for fi, err := range itererr.Zip(s.sdb.AllLocalFilesBySequence(s.folder, protocol.LocalDeviceID, s.localPrevSequence+1, 5000)) {
 		if err != nil {
 		if err != nil {
 			return err
 			return err
@@ -304,15 +303,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context) error {
 		// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
 		// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
 		// the batch ends with a non-delete, or that the last item in the batch is already a delete
 		// the batch ends with a non-delete, or that the last item in the batch is already a delete
 		if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
 		if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
-			if err := batch.Flush(); err != nil {
-				return err
-			}
-			if time.Since(t0) > 5*time.Second {
-				// minor hack -- avoid very long running read transactions
-				// during index transmission, to help prevent excessive
-				// growth of database WAL file
-				break
-			}
+			break
 		}
 		}
 
 
 		if fi.SequenceNo() < s.localPrevSequence+1 {
 		if fi.SequenceNo() < s.localPrevSequence+1 {
@@ -348,11 +339,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context) error {
 
 
 		batch.Append(f)
 		batch.Append(f)
 	}
 	}
-	if err := batch.Flush(); err != nil {
-		return err
-	}
-
-	return nil
+	return batch.Flush()
 }
 }
 
 
 func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string, prevSequence, lastSequence int64) error {
 func (s *indexHandler) receive(fs []protocol.FileInfo, update bool, op string, prevSequence, lastSequence int64) error {