Browse Source

lib/model: Refactor sending indexes as suture service (#5757)

Simon Frei 6 years ago
parent
commit
1054ce9354
1 changed files with 77 additions and 24 deletions
  1. 77 24
      lib/model/model.go

+ 77 - 24
lib/model/model.go

@@ -1041,6 +1041,7 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
 
 	m.pmut.RLock()
 	conn, ok := m.conn[deviceID]
+	closed := m.closed[deviceID]
 	hello := m.helloMessages[deviceID]
 	m.pmut.RUnlock()
 	if !ok {
@@ -1172,7 +1173,19 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
 			}
 		}
 
-		go sendIndexes(conn, folder.ID, fs, startSequence, dropSymlinks)
+		// The token isn't tracked as the service stops when the connection
+		// terminates and is automatically removed from supervisor (by
+		// implementing suture.IsCompletable).
+		m.Add(&indexSender{
+			conn:         conn,
+			connClosed:   closed,
+			folder:       folder.ID,
+			fset:         fs,
+			prevSequence: startSequence,
+			dropSymlinks: dropSymlinks,
+			stop:         make(chan struct{}),
+			stopped:      make(chan struct{}),
+		})
 	}
 
 	m.pmut.Lock()
@@ -1886,15 +1899,28 @@ func (m *model) deviceWasSeen(deviceID protocol.DeviceID) {
 	m.deviceStatRef(deviceID).WasSeen()
 }
 
-func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, prevSequence int64, dropSymlinks bool) {
-	deviceID := conn.ID()
+type indexSender struct {
+	conn         protocol.Connection
+	folder       string
+	dev          string
+	fset         *db.FileSet
+	prevSequence int64
+	dropSymlinks bool
+	connClosed   chan struct{}
+	stop         chan struct{}
+	stopped      chan struct{}
+}
+
+func (s *indexSender) Serve() {
+	defer close(s.stopped)
+
 	var err error
 
-	l.Debugf("Starting sendIndexes for %s to %s at %s (slv=%d)", folder, deviceID, conn, prevSequence)
-	defer l.Debugf("Exiting sendIndexes for %s to %s at %s: %v", folder, deviceID, conn, err)
+	l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence)
+	defer l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.dev, s.conn, err)
 
 	// We need to send one index, regardless of whether there is something to send or not
-	prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, dropSymlinks)
+	err = s.sendIndexTo()
 
 	// Subscribe to LocalIndexUpdated (we have new information to send) and
 	// DeviceDisconnected (it might be us who disconnected, so we should
@@ -1902,22 +1928,37 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, prevSe
 	sub := events.Default.Subscribe(events.LocalIndexUpdated | events.DeviceDisconnected)
 	defer events.Default.Unsubscribe(sub)
 
+	evChan := sub.C()
+	ticker := time.NewTicker(time.Minute)
+	defer ticker.Stop()
+
 	for err == nil {
-		if conn.Closed() {
-			// Our work is done.
+		select {
+		case <-s.stop:
 			return
+		case <-s.connClosed:
+			return
+		default:
 		}
 
 		// While we have sent a sequence at least equal to the one
 		// currently in the database, wait for the local index to update. The
 		// local index may update for other folders than the one we are
 		// sending for.
-		if fs.Sequence(protocol.LocalDeviceID) <= prevSequence {
-			sub.Poll(time.Minute)
+		if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
+			select {
+			case <-s.stop:
+				return
+			case <-s.connClosed:
+				return
+			case <-evChan:
+			case <-ticker.C:
+			}
+
 			continue
 		}
 
-		prevSequence, err = sendIndexTo(prevSequence, conn, folder, fs, dropSymlinks)
+		err = s.sendIndexTo()
 
 		// Wait a short amount of time before entering the next loop. If there
 		// are continuous changes happening to the local index, this gives us
@@ -1926,31 +1967,42 @@ func sendIndexes(conn protocol.Connection, folder string, fs *db.FileSet, prevSe
 	}
 }
 
+func (s *indexSender) Stop() {
+	close(s.stop)
+	<-s.stopped
+}
+
+// Complete implements the suture.IsCompletable interface. When Serve terminates
+// before Stop is called, the supervisor will check for this method and if it
+// returns true removes the service instead of restarting it. Here it always
+// returns true, as indexSender only terminates when a connection is
+// closed/has failed, in which case retrying doesn't help.
+func (s *indexSender) Complete() bool { return true }
+
 // sendIndexTo sends file infos with a sequence number higher than prevSequence and
 // returns the highest sent sequence number.
-func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs *db.FileSet, dropSymlinks bool) (int64, error) {
-	deviceID := conn.ID()
-	initial := prevSequence == 0
+func (s *indexSender) sendIndexTo() error {
+	initial := s.prevSequence == 0
 	batch := newFileInfoBatch(nil)
 	batch.flushFn = func(fs []protocol.FileInfo) error {
-		l.Debugf("Sending indexes for %s to %s at %s: %d files (<%d bytes)", folder, deviceID, conn, len(batch.infos), batch.size)
+		l.Debugf("Sending indexes for %s to %s at %s: %d files (<%d bytes)", s.folder, s.dev, s.conn, len(batch.infos), batch.size)
 		if initial {
 			initial = false
-			return conn.Index(folder, fs)
+			return s.conn.Index(s.folder, fs)
 		}
-		return conn.IndexUpdate(folder, fs)
+		return s.conn.IndexUpdate(s.folder, fs)
 	}
 
 	var err error
 	var f protocol.FileInfo
-	fs.WithHaveSequence(prevSequence+1, func(fi db.FileIntf) bool {
+	s.fset.WithHaveSequence(s.prevSequence+1, func(fi db.FileIntf) bool {
 		if err = batch.flushIfFull(); err != nil {
 			return false
 		}
 
 		if shouldDebug() {
-			if fi.SequenceNo() < prevSequence+1 {
-				panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", prevSequence+1))
+			if fi.SequenceNo() < s.prevSequence+1 {
+				panic(fmt.Sprintln("sequence lower than requested, got:", fi.SequenceNo(), ", asked to start at:", s.prevSequence+1))
 			}
 			if f.Sequence > 0 && fi.SequenceNo() <= f.Sequence {
 				panic(fmt.Sprintln("non-increasing sequence, current:", fi.SequenceNo(), "<= previous:", f.Sequence))
@@ -1969,7 +2021,7 @@ func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs
 		}
 		f.LocalFlags = 0 // never sent externally
 
-		if dropSymlinks && f.IsSymlink() {
+		if s.dropSymlinks && f.IsSymlink() {
 			// Do not send index entries with symlinks to clients that can't
 			// handle it. Fixes issue #3802. Once both sides are upgraded, a
 			// rescan (i.e., change) of the symlink is required for it to
@@ -1981,17 +2033,18 @@ func sendIndexTo(prevSequence int64, conn protocol.Connection, folder string, fs
 		return true
 	})
 	if err != nil {
-		return prevSequence, err
+		return err
 	}
 
 	err = batch.flush()
 
 	// True if there was nothing to be sent
 	if f.Sequence == 0 {
-		return prevSequence, err
+		return err
 	}
 
-	return f.Sequence, err
+	s.prevSequence = f.Sequence
+	return err
 }
 
 func (m *model) requestGlobal(deviceID protocol.DeviceID, folder, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {