Browse Source

lib/model: Clean up index handler life cycle (fixes #9021) (#9038)

Co-authored-by: Simon Frei <[email protected]>
Jakob Borg 2 năm trước cách đây
mục cha
commit
40b3b9ad15

+ 3 - 1
lib/model/fakeconns_test.go

@@ -14,6 +14,7 @@ import (
 
 	"github.com/syncthing/syncthing/lib/protocol"
 	protocolmocks "github.com/syncthing/syncthing/lib/protocol/mocks"
+	"github.com/syncthing/syncthing/lib/rand"
 	"github.com/syncthing/syncthing/lib/scanner"
 )
 
@@ -36,10 +37,11 @@ func newFakeConnection(id protocol.DeviceID, model Model) *fakeConnection {
 	f.CloseCalls(func(err error) {
 		f.closeOnce.Do(func() {
 			close(f.closed)
+			model.Closed(f, err)
 		})
-		model.Closed(f, err)
 		f.ClosedReturns(f.closed)
 	})
+	f.StringReturns(rand.String(8))
 	return f
 }
 

+ 19 - 43
lib/model/indexhandler.go

@@ -12,8 +12,6 @@ import (
 	"sync"
 	"time"
 
-	"github.com/thejerf/suture/v4"
-
 	"github.com/syncthing/syncthing/lib/config"
 	"github.com/syncthing/syncthing/lib/db"
 	"github.com/syncthing/syncthing/lib/events"
@@ -28,7 +26,6 @@ type indexHandler struct {
 	folderIsReceiveEncrypted bool
 	prevSequence             int64
 	evLogger                 events.Logger
-	token                    suture.ServiceToken
 
 	cond   *sync.Cond
 	paused bool
@@ -373,11 +370,10 @@ func (s *indexHandler) String() string {
 }
 
 type indexHandlerRegistry struct {
-	sup           *suture.Supervisor
 	evLogger      events.Logger
 	conn          protocol.Connection
 	downloads     *deviceDownloadState
-	indexHandlers map[string]*indexHandler
+	indexHandlers *serviceMap[string, *indexHandler]
 	startInfos    map[string]*clusterConfigDeviceInfo
 	folderStates  map[string]*indexHandlerFolderState
 	mut           sync.Mutex
@@ -389,27 +385,16 @@ type indexHandlerFolderState struct {
 	runner service
 }
 
-func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry {
+func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, evLogger events.Logger) *indexHandlerRegistry {
 	r := &indexHandlerRegistry{
+		evLogger:      evLogger,
 		conn:          conn,
 		downloads:     downloads,
-		evLogger:      evLogger,
-		indexHandlers: make(map[string]*indexHandler),
+		indexHandlers: newServiceMap[string, *indexHandler](evLogger),
 		startInfos:    make(map[string]*clusterConfigDeviceInfo),
 		folderStates:  make(map[string]*indexHandlerFolderState),
 		mut:           sync.Mutex{},
 	}
-	r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l))
-	ourToken := parentSup.Add(r.sup)
-	r.sup.Add(svcutil.AsService(func(ctx context.Context) error {
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case <-closed:
-			parentSup.Remove(ourToken)
-		}
-		return nil
-	}, fmt.Sprintf("%v/waitForClosed", r)))
 	return r
 }
 
@@ -417,20 +402,18 @@ func (r *indexHandlerRegistry) String() string {
 	return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short())
 }
 
-func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor {
-	return r.sup
+func (r *indexHandlerRegistry) Serve(ctx context.Context) error {
+	// Running the index handler registry means running the individual index
+	// handler children.
+	return r.indexHandlers.Serve(ctx)
 }
 
 func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
-	if is, ok := r.indexHandlers[folder.ID]; ok {
-		r.sup.RemoveAndWait(is.token, 0)
-		delete(r.indexHandlers, folder.ID)
-	}
+	r.indexHandlers.RemoveAndWait(folder.ID, 0)
 	delete(r.startInfos, folder.ID)
 
 	is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
-	is.token = r.sup.Add(is)
-	r.indexHandlers[folder.ID] = is
+	r.indexHandlers.Add(folder.ID, is)
 
 	// This new connection might help us get in sync.
 	runner.SchedulePull()
@@ -444,9 +427,7 @@ func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterCon
 	r.mut.Lock()
 	defer r.mut.Unlock()
 
-	if is, ok := r.indexHandlers[folder]; ok {
-		r.sup.RemoveAndWait(is.token, 0)
-		delete(r.indexHandlers, folder)
+	if r.indexHandlers.RemoveAndWait(folder, 0) {
 		l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
 	}
 	folderState, ok := r.folderStates[folder]
@@ -465,10 +446,7 @@ func (r *indexHandlerRegistry) Remove(folder string) {
 	defer r.mut.Unlock()
 
 	l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
-	if is, ok := r.indexHandlers[folder]; ok {
-		r.sup.RemoveAndWait(is.token, 0)
-		delete(r.indexHandlers, folder)
-	}
+	r.indexHandlers.RemoveAndWait(folder, 0)
 	delete(r.startInfos, folder)
 	l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
 }
@@ -480,13 +458,12 @@ func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderSta
 	r.mut.Lock()
 	defer r.mut.Unlock()
 
-	for folder, is := range r.indexHandlers {
+	r.indexHandlers.Each(func(folder string, is *indexHandler) {
 		if _, ok := except[folder]; !ok {
-			r.sup.RemoveAndWait(is.token, 0)
-			delete(r.indexHandlers, folder)
+			r.indexHandlers.RemoveAndWait(folder, 0)
 			l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
 		}
-	}
+	})
 	for folder := range r.startInfos {
 		if _, ok := except[folder]; !ok {
 			delete(r.startInfos, folder)
@@ -518,7 +495,7 @@ func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfigura
 func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
 	l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
 	delete(r.folderStates, folder)
-	if is, ok := r.indexHandlers[folder]; ok {
+	if is, ok := r.indexHandlers.Get(folder); ok {
 		is.pause()
 		l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
 	} else {
@@ -536,11 +513,10 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura
 		runner: runner,
 	}
 
-	is, isOk := r.indexHandlers[folder.ID]
+	is, isOk := r.indexHandlers.Get(folder.ID)
 	if info, ok := r.startInfos[folder.ID]; ok {
 		if isOk {
-			r.sup.RemoveAndWait(is.token, 0)
-			delete(r.indexHandlers, folder.ID)
+			r.indexHandlers.RemoveAndWait(folder.ID, 0)
 			l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
 		}
 		r.startLocked(folder, fset, runner, info)
@@ -557,7 +533,7 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura
 func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
 	r.mut.Lock()
 	defer r.mut.Unlock()
-	is, isOk := r.indexHandlers[folder]
+	is, isOk := r.indexHandlers.Get(folder)
 	if !isOk {
 		l.Infof("%v for nonexistent or paused folder %q", op, folder)
 		return fmt.Errorf("%s: %w", folder, ErrFolderMissing)

+ 16 - 15
lib/model/model.go

@@ -165,7 +165,7 @@ type model struct {
 	helloMessages       map[protocol.DeviceID]protocol.Hello
 	deviceDownloads     map[protocol.DeviceID]*deviceDownloadState
 	remoteFolderStates  map[protocol.DeviceID]map[string]remoteFolderState // deviceID -> folders
-	indexHandlers       map[protocol.DeviceID]*indexHandlerRegistry
+	indexHandlers       *serviceMap[protocol.DeviceID, *indexHandlerRegistry]
 
 	// for testing only
 	foldersRunning atomic.Int32
@@ -248,12 +248,13 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
 		helloMessages:       make(map[protocol.DeviceID]protocol.Hello),
 		deviceDownloads:     make(map[protocol.DeviceID]*deviceDownloadState),
 		remoteFolderStates:  make(map[protocol.DeviceID]map[string]remoteFolderState),
-		indexHandlers:       make(map[protocol.DeviceID]*indexHandlerRegistry),
+		indexHandlers:       newServiceMap[protocol.DeviceID, *indexHandlerRegistry](evLogger),
 	}
 	for devID := range cfg.Devices() {
 		m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID)
 	}
 	m.Add(m.progressEmitter)
+	m.Add(m.indexHandlers)
 	m.Add(svcutil.AsService(m.serve, m.String()))
 
 	return m
@@ -487,9 +488,9 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
 	}
 
 	m.cleanupFolderLocked(cfg)
-	for _, r := range m.indexHandlers {
+	m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
 		r.Remove(cfg.ID)
-	}
+	})
 
 	m.fmut.Unlock()
 	m.pmut.RUnlock()
@@ -563,9 +564,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
 	// Care needs to be taken because we already hold fmut and the lock order
 	// must be the same everywhere. As fmut is acquired first, this is fine.
 	m.pmut.RLock()
-	for _, indexRegistry := range m.indexHandlers {
-		indexRegistry.RegisterFolderState(to, fset, m.folderRunners[to.ID])
-	}
+	m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
+		r.RegisterFolderState(to, fset, m.folderRunners[to.ID])
+	})
 	m.pmut.RUnlock()
 
 	var infoMsg string
@@ -601,9 +602,9 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
 	// Care needs to be taken because we already hold fmut and the lock order
 	// must be the same everywhere. As fmut is acquired first, this is fine.
 	m.pmut.RLock()
-	for _, indexRegistry := range m.indexHandlers {
-		indexRegistry.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID])
-	}
+	m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
+		r.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID])
+	})
 	m.pmut.RUnlock()
 
 	return nil
@@ -1138,7 +1139,7 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc
 	}
 
 	m.pmut.RLock()
-	indexHandler, ok := m.indexHandlers[deviceID]
+	indexHandler, ok := m.indexHandlers.Get(deviceID)
 	m.pmut.RUnlock()
 	if !ok {
 		// This should be impossible, as an index handler always exists for an
@@ -1170,7 +1171,7 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
 	l.Debugf("Handling ClusterConfig from %v", deviceID.Short())
 
 	m.pmut.RLock()
-	indexHandlerRegistry, ok := m.indexHandlers[deviceID]
+	indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID)
 	m.pmut.RUnlock()
 	if !ok {
 		panic("bug: ClusterConfig called on closed or nonexistent connection")
@@ -1792,7 +1793,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
 	delete(m.remoteFolderStates, device)
 	closed := m.closed[device]
 	delete(m.closed, device)
-	delete(m.indexHandlers, device)
+	m.indexHandlers.RemoveAndWait(device, 0)
 	m.pmut.Unlock()
 
 	m.progressEmitter.temporaryIndexUnsubscribe(conn)
@@ -2251,11 +2252,11 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
 	closed := make(chan struct{})
 	m.closed[deviceID] = closed
 	m.deviceDownloads[deviceID] = newDeviceDownloadState()
-	indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], closed, m.Supervisor, m.evLogger)
+	indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger)
 	for id, fcfg := range m.folderCfgs {
 		indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id])
 	}
-	m.indexHandlers[deviceID] = indexRegistry
+	m.indexHandlers.Add(deviceID, indexRegistry)
 	m.fmut.RUnlock()
 	// 0: default, <0: no limiting
 	switch {

+ 3 - 2
lib/model/model_test.go

@@ -1337,8 +1337,9 @@ func TestAutoAcceptEnc(t *testing.T) {
 	// Earlier tests might cause the connection to get closed, thus ClusterConfig
 	// would panic.
 	clusterConfig := func(deviceID protocol.DeviceID, cm protocol.ClusterConfig) {
-		m.AddConnection(newFakeConnection(deviceID, m), protocol.Hello{})
-		m.ClusterConfig(&protocolmocks.Connection{DeviceIDStub: func() protocol.DeviceID { return deviceID }}, cm)
+		conn := newFakeConnection(deviceID, m)
+		m.AddConnection(conn, protocol.Hello{})
+		m.ClusterConfig(conn, cm)
 	}
 
 	clusterConfig(device1, basicCC())

+ 103 - 0
lib/model/service_map.go

@@ -0,0 +1,103 @@
+// Copyright (C) 2023 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/syncthing/syncthing/lib/events"
+	"github.com/syncthing/syncthing/lib/svcutil"
+	"github.com/thejerf/suture/v4"
+)
+
+// A serviceMap is a utility map of arbitrary keys to a suture.Service of
+// some kind, where adding and removing services ensures they are properly
+// started and stopped on the given Supervisor. The serviceMap is itself a
+// suture.Service and should be added to a Supervisor.
+// Not safe for concurrent use.
+type serviceMap[K comparable, S suture.Service] struct {
+	services    map[K]S
+	tokens      map[K]suture.ServiceToken
+	supervisor  *suture.Supervisor
+	eventLogger events.Logger
+}
+
+func newServiceMap[K comparable, S suture.Service](eventLogger events.Logger) *serviceMap[K, S] {
+	m := &serviceMap[K, S]{
+		services:    make(map[K]S),
+		tokens:      make(map[K]suture.ServiceToken),
+		eventLogger: eventLogger,
+	}
+	m.supervisor = suture.New(m.String(), svcutil.SpecWithDebugLogger(l))
+	return m
+}
+
+// Add adds a service to the map, starting it on the supervisor. If there is
+// already a service at the given key, it is removed first.
+func (s *serviceMap[K, S]) Add(k K, v S) {
+	if tok, ok := s.tokens[k]; ok {
+		// There is already a service at this key, remove it first.
+		s.supervisor.Remove(tok)
+		s.eventLogger.Log(events.Failure, fmt.Sprintf("%s replaced service at key %v", s, k))
+	}
+	s.services[k] = v
+	s.tokens[k] = s.supervisor.Add(v)
+}
+
+// Get returns the service at the given key, or the empty value and false if
+// there is no service at that key.
+func (s *serviceMap[K, S]) Get(k K) (v S, ok bool) {
+	v, ok = s.services[k]
+	return
+}
+
+// Remove removes the service at the given key, stopping it on the supervisor.
+// If there is no service at the given key, nothing happens. The return value
+// indicates whether a service was removed.
+func (s *serviceMap[K, S]) Remove(k K) (found bool) {
+	if tok, ok := s.tokens[k]; ok {
+		found = true
+		s.supervisor.Remove(tok)
+	}
+	delete(s.services, k)
+	delete(s.tokens, k)
+	return
+}
+
+// RemoveAndWait removes the service at the given key, stopping it on the
+// supervisor. If there is no service at the given key, nothing happens. The
+// return value indicates whether a service was removed.
+func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) (found bool) {
+	if tok, ok := s.tokens[k]; ok {
+		found = true
+		s.supervisor.RemoveAndWait(tok, timeout)
+	}
+	delete(s.services, k)
+	delete(s.tokens, k)
+	return found
+}
+
+// Each calls the given function for each service in the map.
+func (s *serviceMap[K, S]) Each(fn func(K, S)) {
+	for key, svc := range s.services {
+		fn(key, svc)
+	}
+}
+
+// Suture implementation
+
+func (s *serviceMap[K, S]) Serve(ctx context.Context) error {
+	return s.supervisor.Serve(ctx)
+}
+
+func (s *serviceMap[K, S]) String() string {
+	var kv K
+	var sv S
+	return fmt.Sprintf("serviceMap[%T, %T]@%p", kv, sv, s)
+}

+ 156 - 0
lib/model/service_map_test.go

@@ -0,0 +1,156 @@
+// Copyright (C) 2023 The Syncthing Authors.
+//
+// This Source Code Form is subject to the terms of the Mozilla Public
+// License, v. 2.0. If a copy of the MPL was not distributed with this file,
+// You can obtain one at https://mozilla.org/MPL/2.0/.
+
+package model
+
+import (
+	"context"
+	"strings"
+	"testing"
+
+	"github.com/syncthing/syncthing/lib/events"
+	"github.com/thejerf/suture/v4"
+)
+
+func TestServiceMap(t *testing.T) {
+	t.Parallel()
+	ctx, cancel := context.WithCancel(context.Background())
+	t.Cleanup(cancel)
+	sup := suture.NewSimple("TestServiceMap")
+	sup.ServeBackground(ctx)
+
+	t.Run("SimpleAddRemove", func(t *testing.T) {
+		t.Parallel()
+
+		sm := newServiceMap[string, *dummyService](events.NoopLogger)
+		sup.Add(sm)
+
+		// Add two services. They should start.
+
+		d1 := newDummyService()
+		d2 := newDummyService()
+
+		sm.Add("d1", d1)
+		sm.Add("d2", d2)
+
+		<-d1.started
+		<-d2.started
+
+		// Remove them. They should stop.
+
+		if !sm.Remove("d1") {
+			t.Errorf("Remove failed")
+		}
+		if !sm.Remove("d2") {
+			t.Errorf("Remove failed")
+		}
+
+		<-d1.stopped
+		<-d2.stopped
+	})
+
+	t.Run("OverwriteImpliesRemove", func(t *testing.T) {
+		t.Parallel()
+
+		sm := newServiceMap[string, *dummyService](events.NoopLogger)
+		sup.Add(sm)
+
+		d1 := newDummyService()
+		d2 := newDummyService()
+
+		// Add d1, it should start.
+
+		sm.Add("k", d1)
+		<-d1.started
+
+		// Add d2, with the same key. The previous one should stop as we're
+		// doing a replace.
+
+		sm.Add("k", d2)
+		<-d1.stopped
+		<-d2.started
+
+		if !sm.Remove("k") {
+			t.Errorf("Remove failed")
+		}
+
+		<-d2.stopped
+	})
+
+	t.Run("IterateWithRemoveAndWait", func(t *testing.T) {
+		t.Parallel()
+
+		sm := newServiceMap[string, *dummyService](events.NoopLogger)
+		sup.Add(sm)
+
+		// Add four services.
+
+		d1 := newDummyService()
+		d2 := newDummyService()
+		d3 := newDummyService()
+		d4 := newDummyService()
+
+		sm.Add("keep1", d1)
+		sm.Add("remove2", d2)
+		sm.Add("keep3", d3)
+		sm.Add("remove4", d4)
+
+		<-d1.started
+		<-d2.started
+		<-d3.started
+		<-d4.started
+
+		// Remove two of them from within the iterator.
+
+		sm.Each(func(k string, v *dummyService) {
+			if strings.HasPrefix(k, "remove") {
+				sm.RemoveAndWait(k, 0)
+			}
+		})
+
+		// They should have stopped.
+
+		<-d2.stopped
+		<-d4.stopped
+
+		// They should not be in the map anymore.
+
+		if _, ok := sm.Get("remove2"); ok {
+			t.Errorf("Service still in map")
+		}
+		if _, ok := sm.Get("remove4"); ok {
+			t.Errorf("Service still in map")
+		}
+
+		// The other two should still be running.
+
+		if _, ok := sm.Get("keep1"); !ok {
+			t.Errorf("Service not in map")
+		}
+		if _, ok := sm.Get("keep3"); !ok {
+			t.Errorf("Service not in map")
+		}
+	})
+}
+
+type dummyService struct {
+	started chan struct{}
+	stopped chan struct{}
+}
+
+func newDummyService() *dummyService {
+	return &dummyService{
+		started: make(chan struct{}),
+		stopped: make(chan struct{}),
+	}
+}
+
+func (d *dummyService) Serve(ctx context.Context) error {
+	close(d.started)
+	defer close(d.stopped)
+	<-ctx.Done()
+	return nil
+}