|
@@ -25,9 +25,24 @@ type indexHandler struct {
|
|
|
downloads *deviceDownloadState
|
|
|
folder string
|
|
|
folderIsReceiveEncrypted bool
|
|
|
- prevSequence int64
|
|
|
evLogger events.Logger
|
|
|
|
|
|
+ // We track the latest / highest sequence number in two ways for two
|
|
|
+ // different reasons. Initially they are the same -- the highest seen
|
|
|
+ // sequence number reported by the other side (or zero).
|
|
|
+ //
|
|
|
+ // One is the highest number we've seen when iterating the database,
|
|
|
+ // which we track for database iteration purposes. When we loop, we
|
|
|
+ // start looking at that number plus one in the next loop. Our index
|
|
|
+ // numbering may have holes which this will skip over.
|
|
|
+ //
|
|
|
+ // The other is the highest sequence we previously sent to the other
|
|
|
+ // side, used by them for correctness checks. This one must not skip
|
|
|
+ // holes. That is, if we iterate and find a hole, this is not
|
|
|
+ // incremented because nothing was sent to the other side.
|
|
|
+ localPrevSequence int64 // the highest sequence number we've seen in our FileInfos
|
|
|
+ sentPrevSequence int64 // the highest sequence number we've sent to the peer
|
|
|
+
|
|
|
cond *sync.Cond
|
|
|
paused bool
|
|
|
fset *db.FileSet
|
|
@@ -100,7 +115,8 @@ func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, f
|
|
|
downloads: downloads,
|
|
|
folder: folder.ID,
|
|
|
folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
|
|
|
- prevSequence: startSequence,
|
|
|
+ localPrevSequence: startSequence,
|
|
|
+ sentPrevSequence: startSequence,
|
|
|
evLogger: evLogger,
|
|
|
|
|
|
fset: fset,
|
|
@@ -127,7 +143,7 @@ func (s *indexHandler) waitForFileset(ctx context.Context) (*db.FileSet, error)
|
|
|
}
|
|
|
|
|
|
func (s *indexHandler) Serve(ctx context.Context) (err error) {
|
|
|
- l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.DeviceID().Short(), s.conn, s.prevSequence)
|
|
|
+ l.Debugf("Starting index handler for %s to %s at %s (localPrevSequence=%d)", s.folder, s.conn.DeviceID().Short(), s.conn, s.localPrevSequence)
|
|
|
stop := make(chan struct{})
|
|
|
|
|
|
defer func() {
|
|
@@ -172,7 +188,7 @@ func (s *indexHandler) Serve(ctx context.Context) (err error) {
|
|
|
// currently in the database, wait for the local index to update. The
|
|
|
// local index may update for other folders than the one we are
|
|
|
// sending for.
|
|
|
- if fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
|
|
|
+ if fset.Sequence(protocol.LocalDeviceID) <= s.localPrevSequence {
|
|
|
select {
|
|
|
case <-ctx.Done():
|
|
|
return ctx.Err()
|
|
@@ -223,13 +239,7 @@ func (s *indexHandler) pause() {
|
|
|
// sendIndexTo sends file infos with a sequence number higher than prevSequence and
|
|
|
// returns the highest sent sequence number.
|
|
|
func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error {
|
|
|
- // Keep track of the previous sequence we sent. This is separate from
|
|
|
- // s.prevSequence because the latter will skip over holes in the
|
|
|
- // sequence numberings, while sentPrevSequence should always be
|
|
|
- // precisely the highest previously sent sequence.
|
|
|
- sentPrevSequence := s.prevSequence
|
|
|
-
|
|
|
- initial := s.prevSequence == 0
|
|
|
+ initial := s.localPrevSequence == 0
|
|
|
batch := db.NewFileInfoBatch(nil)
|
|
|
var batchError error
|
|
|
batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
|
|
@@ -262,7 +272,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error
|
|
|
err = s.conn.IndexUpdate(ctx, &protocol.IndexUpdate{
|
|
|
Folder: s.folder,
|
|
|
Files: fs,
|
|
|
- PrevSequence: sentPrevSequence,
|
|
|
+ PrevSequence: s.sentPrevSequence,
|
|
|
LastSequence: lastSequence,
|
|
|
})
|
|
|
}
|
|
@@ -270,7 +280,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error
|
|
|
batchError = err
|
|
|
return err
|
|
|
}
|
|
|
- sentPrevSequence = lastSequence
|
|
|
+ s.sentPrevSequence = lastSequence
|
|
|
return nil
|
|
|
})
|
|
|
|
|
@@ -282,7 +292,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error
|
|
|
}
|
|
|
defer snap.Release()
|
|
|
previousWasDelete := false
|
|
|
- snap.WithHaveSequence(s.prevSequence+1, func(fi protocol.FileIntf) bool {
|
|
|
+ snap.WithHaveSequence(s.localPrevSequence+1, func(fi protocol.FileIntf) bool {
|
|
|
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
|
|
|
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
|
|
|
// the batch ends with a non-delete, or that the last item in the batch is already a delete
|
|
@@ -292,17 +302,17 @@ func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if fi.SequenceNo() < s.prevSequence+1 {
|
|
|
+ if fi.SequenceNo() < s.localPrevSequence+1 {
|
|
|
s.logSequenceAnomaly("database returned sequence lower than requested", map[string]any{
|
|
|
"sequence": fi.SequenceNo(),
|
|
|
- "start": s.prevSequence + 1,
|
|
|
+ "start": s.localPrevSequence + 1,
|
|
|
})
|
|
|
}
|
|
|
|
|
|
if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
|
|
|
s.logSequenceAnomaly("database returned non-increasing sequence", map[string]any{
|
|
|
"sequence": fi.SequenceNo(),
|
|
|
- "start": s.prevSequence + 1,
|
|
|
+ "start": s.localPrevSequence + 1,
|
|
|
"previous": f.Sequence,
|
|
|
})
|
|
|
// Abort this round of index sending - the next one will pick
|
|
@@ -348,7 +358,7 @@ func (s *indexHandler) sendIndexTo(ctx context.Context, fset *db.FileSet) error
|
|
|
// however it's possible that a higher sequence exists, just doesn't need to
|
|
|
// be sent (e.g. in a receive-only folder, when a local change was
|
|
|
// reverted). No point trying to send nothing again.
|
|
|
- s.prevSequence = snap.Sequence(protocol.LocalDeviceID)
|
|
|
+ s.localPrevSequence = snap.Sequence(protocol.LocalDeviceID)
|
|
|
|
|
|
return nil
|
|
|
}
|