|
|
@@ -30,6 +30,7 @@ type indexSender struct {
|
|
|
prevSequence int64
|
|
|
evLogger events.Logger
|
|
|
connClosed chan struct{}
|
|
|
+ done chan struct{}
|
|
|
token suture.ServiceToken
|
|
|
pauseChan chan struct{}
|
|
|
resumeChan chan *db.FileSet
|
|
|
@@ -38,6 +39,7 @@ type indexSender struct {
|
|
|
func (s *indexSender) Serve(ctx context.Context) (err error) {
|
|
|
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence)
|
|
|
defer func() {
|
|
|
+ close(s.done)
|
|
|
err = svcutil.NoRestartErr(err)
|
|
|
l.Debugf("Exiting indexSender for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err)
|
|
|
}()
|
|
|
@@ -101,14 +103,14 @@ func (s *indexSender) Serve(ctx context.Context) (err error) {
|
|
|
|
|
|
func (s *indexSender) resume(fset *db.FileSet) {
|
|
|
select {
|
|
|
- case <-s.connClosed:
|
|
|
+ case <-s.done:
|
|
|
case s.resumeChan <- fset:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
func (s *indexSender) pause() {
|
|
|
select {
|
|
|
- case <-s.connClosed:
|
|
|
+ case <-s.done:
|
|
|
case s.pauseChan <- struct{}{}:
|
|
|
}
|
|
|
}
|
|
|
@@ -314,6 +316,7 @@ func (r *indexSenderRegistry) addLocked(folder config.FolderConfiguration, fset
|
|
|
is := &indexSender{
|
|
|
conn: r.conn,
|
|
|
connClosed: r.closed,
|
|
|
+ done: make(chan struct{}),
|
|
|
folder: folder.ID,
|
|
|
folderIsReceiveEncrypted: folder.Type == config.FolderTypeReceiveEncrypted,
|
|
|
fset: fset,
|
|
|
@@ -366,7 +369,7 @@ func (r *indexSenderRegistry) removeAllExcept(except map[string]struct{}) {
|
|
|
delete(r.indexSenders, folder)
|
|
|
}
|
|
|
}
|
|
|
- for folder := range r.indexSenders {
|
|
|
+ for folder := range r.startInfos {
|
|
|
if _, ok := except[folder]; !ok {
|
|
|
delete(r.startInfos, folder)
|
|
|
}
|