|
@@ -111,20 +111,47 @@ func newIndexHandler(conn protocol.Connection, downloads *deviceDownloadState, f
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+// waitForFileset waits for the handler to resume and fetches the current fileset.
|
|
|
+func (s *indexHandler) waitForFileset(ctx context.Context) (*db.FileSet, error) {
|
|
|
+ s.cond.L.Lock()
|
|
|
+ defer s.cond.L.Unlock()
|
|
|
+
|
|
|
+ for s.paused {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ return nil, ctx.Err()
|
|
|
+ default:
|
|
|
+ s.cond.Wait()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return s.fset, nil
|
|
|
+}
|
|
|
+
|
|
|
func (s *indexHandler) Serve(ctx context.Context) (err error) {
|
|
|
l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
|
|
|
+ stop := make(chan struct{})
|
|
|
+
|
|
|
defer func() {
|
|
|
err = svcutil.NoRestartErr(err)
|
|
|
l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
|
|
|
+ close(stop)
|
|
|
+ }()
|
|
|
+
|
|
|
+ // Broadcast the pause cond when the context quits
|
|
|
+ go func() {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
+ s.cond.Broadcast()
|
|
|
+ case <-stop:
|
|
|
+ }
|
|
|
}()
|
|
|
|
|
|
// We need to send one index, regardless of whether there is something to send or not
|
|
|
- s.cond.L.Lock()
|
|
|
- for s.paused {
|
|
|
- s.cond.Wait()
|
|
|
+ fset, err := s.waitForFileset(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
- fset := s.fset
|
|
|
- s.cond.L.Unlock()
|
|
|
err = s.sendIndexTo(ctx, fset)
|
|
|
|
|
|
// Subscribe to LocalIndexUpdated (we have new information to send) and
|
|
@@ -138,12 +165,10 @@ func (s *indexHandler) Serve(ctx context.Context) (err error) {
|
|
|
defer ticker.Stop()
|
|
|
|
|
|
for err == nil {
|
|
|
- s.cond.L.Lock()
|
|
|
- for s.paused {
|
|
|
- s.cond.Wait()
|
|
|
+ fset, err = s.waitForFileset(ctx)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
- fset := s.fset
|
|
|
- s.cond.L.Unlock()
|
|
|
|
|
|
// While we have sent a sequence at least equal to the one
|
|
|
// currently in the database, wait for the local index to update. The
|
|
@@ -181,6 +206,7 @@ func (s *indexHandler) resume(fset *db.FileSet, runner service) {
|
|
|
s.paused = false
|
|
|
s.fset = fset
|
|
|
s.runner = runner
|
|
|
+ s.cond.Broadcast()
|
|
|
s.cond.L.Unlock()
|
|
|
}
|
|
|
|
|
@@ -192,6 +218,7 @@ func (s *indexHandler) pause() {
|
|
|
s.paused = true
|
|
|
s.fset = nil
|
|
|
s.runner = nil
|
|
|
+ s.cond.Broadcast()
|
|
|
s.cond.L.Unlock()
|
|
|
}
|
|
|
|