Browse Source

cmd/syncthing: Allow custom event subscriptions (fixes #1879)

This adds a parameter "events" to the /rest/events endpoint. It should
be a comma separated list of the events the consumer is interested in.
When not given it defaults to the current set of events, so it's
backwards compatible.

The API service then manages subscriptions, creating them as required
for each requested event mask. Old subscriptions are not "garbage
collected" - it's assumed that in normal usage the set of event
subscriptions will be small enough. Possibly lower than before, as we
will not set up the disk event subscription unless it's actually used.

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4092
Jakob Borg 8 years ago
parent
commit
d48e46a29c
4 changed files with 149 additions and 19 deletions
  1. 51 14
      cmd/syncthing/gui.go
  2. 32 0
      cmd/syncthing/gui_test.go
  3. 5 5
      cmd/syncthing/main.go
  4. 61 0
      lib/events/events.go

+ 51 - 14
cmd/syncthing/gui.go

@@ -45,6 +45,12 @@ var (
 	startTime = time.Now()
 )
 
+const (
+	defaultEventMask   = events.AllEvents &^ events.LocalChangeDetected &^ events.RemoteChangeDetected
+	diskEventMask      = events.LocalChangeDetected | events.RemoteChangeDetected
+	eventSubBufferSize = 1000
+)
+
 type apiService struct {
 	id                 protocol.DeviceID
 	cfg                configIntf
@@ -52,8 +58,8 @@ type apiService struct {
 	httpsKeyFile       string
 	statics            *staticsServer
 	model              modelIntf
-	eventSub           events.BufferedSubscription
-	diskEventSub       events.BufferedSubscription
+	eventSubs          map[events.EventType]events.BufferedSubscription
+	eventSubsMut       sync.Mutex
 	discoverer         discover.CachingMux
 	connectionsService connectionsIntf
 	fss                *folderSummaryService
@@ -114,16 +120,19 @@ type connectionsIntf interface {
 	Status() map[string]interface{}
 }
 
-func newAPIService(id protocol.DeviceID, cfg configIntf, httpsCertFile, httpsKeyFile, assetDir string, m modelIntf, eventSub events.BufferedSubscription, diskEventSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connectionsIntf, errors, systemLog logger.Recorder) *apiService {
+func newAPIService(id protocol.DeviceID, cfg configIntf, httpsCertFile, httpsKeyFile, assetDir string, m modelIntf, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connectionsIntf, errors, systemLog logger.Recorder) *apiService {
 	service := &apiService{
-		id:                 id,
-		cfg:                cfg,
-		httpsCertFile:      httpsCertFile,
-		httpsKeyFile:       httpsKeyFile,
-		statics:            newStaticsServer(cfg.GUI().Theme, assetDir),
-		model:              m,
-		eventSub:           eventSub,
-		diskEventSub:       diskEventSub,
+		id:            id,
+		cfg:           cfg,
+		httpsCertFile: httpsCertFile,
+		httpsKeyFile:  httpsKeyFile,
+		statics:       newStaticsServer(cfg.GUI().Theme, assetDir),
+		model:         m,
+		eventSubs: map[events.EventType]events.BufferedSubscription{
+			defaultEventMask: defaultSub,
+			diskEventMask:    diskSub,
+		},
+		eventSubsMut:       sync.NewMutex(),
 		discoverer:         discoverer,
 		connectionsService: connectionsService,
 		systemConfigMut:    sync.NewMutex(),
@@ -234,7 +243,7 @@ func (s *apiService) Serve() {
 	getRestMux.HandleFunc("/rest/db/need", s.getDBNeed)                          // folder [perpage] [page]
 	getRestMux.HandleFunc("/rest/db/status", s.getDBStatus)                      // folder
 	getRestMux.HandleFunc("/rest/db/browse", s.getDBBrowse)                      // folder [prefix] [dirsonly] [levels]
-	getRestMux.HandleFunc("/rest/events", s.getIndexEvents)                      // [since] [limit] [timeout]
+	getRestMux.HandleFunc("/rest/events", s.getIndexEvents)                      // [since] [limit] [timeout] [events]
 	getRestMux.HandleFunc("/rest/events/disk", s.getDiskEvents)                  // [since] [limit] [timeout]
 	getRestMux.HandleFunc("/rest/stats/device", s.getDeviceStats)                // -
 	getRestMux.HandleFunc("/rest/stats/folder", s.getFolderStats)                // -
@@ -1011,11 +1020,14 @@ func (s *apiService) postDBIgnores(w http.ResponseWriter, r *http.Request) {
 
 func (s *apiService) getIndexEvents(w http.ResponseWriter, r *http.Request) {
 	s.fss.gotEventRequest()
-	s.getEvents(w, r, s.eventSub)
+	mask := s.getEventMask(r.URL.Query().Get("events"))
+	sub := s.getEventSub(mask)
+	s.getEvents(w, r, sub)
 }
 
 func (s *apiService) getDiskEvents(w http.ResponseWriter, r *http.Request) {
-	s.getEvents(w, r, s.diskEventSub)
+	sub := s.getEventSub(diskEventMask)
+	s.getEvents(w, r, sub)
 }
 
 func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub events.BufferedSubscription) {
@@ -1047,6 +1059,31 @@ func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub
 	sendJSON(w, evs)
 }
 
+func (s *apiService) getEventMask(evs string) events.EventType {
+	eventMask := defaultEventMask
+	if evs != "" {
+		eventList := strings.Split(evs, ",")
+		eventMask = 0
+		for _, ev := range eventList {
+			eventMask |= events.UnmarshalEventType(strings.TrimSpace(ev))
+		}
+	}
+	return eventMask
+}
+
+func (s *apiService) getEventSub(mask events.EventType) events.BufferedSubscription {
+	s.eventSubsMut.Lock()
+	bufsub, ok := s.eventSubs[mask]
+	if !ok {
+		evsub := events.Default.Subscribe(mask)
+		bufsub = events.NewBufferedSubscription(evsub, eventSubBufferSize)
+		s.eventSubs[mask] = bufsub
+	}
+	s.eventSubsMut.Unlock()
+
+	return bufsub
+}
+
 func (s *apiService) getSystemUpgrade(w http.ResponseWriter, r *http.Request) {
 	if noUpgradeFromEnv {
 		http.Error(w, upgrade.ErrUpgradeUnsupported.Error(), 500)

+ 32 - 0
cmd/syncthing/gui_test.go

@@ -23,6 +23,7 @@ import (
 
 	"github.com/d4l3k/messagediff"
 	"github.com/syncthing/syncthing/lib/config"
+	"github.com/syncthing/syncthing/lib/events"
 	"github.com/syncthing/syncthing/lib/protocol"
 	"github.com/syncthing/syncthing/lib/sync"
 	"github.com/thejerf/suture"
@@ -924,3 +925,34 @@ func TestOptionsRequest(t *testing.T) {
 		t.Fatal("OPTIONS on /rest/system/status should return a 'Access-Control-Allow-Headers: Content-Type, X-API-KEY' header")
 	}
 }
+
+func TestEventMasks(t *testing.T) {
+	cfg := new(mockedConfig)
+	defSub := new(mockedEventSub)
+	diskSub := new(mockedEventSub)
+	svc := newAPIService(protocol.LocalDeviceID, cfg, "", "", "", nil, defSub, diskSub, nil, nil, nil, nil)
+
+	if mask := svc.getEventMask(""); mask != defaultEventMask {
+		t.Errorf("incorrect default mask %x != %x", int64(mask), int64(defaultEventMask))
+	}
+
+	expected := events.FolderSummary | events.LocalChangeDetected
+	if mask := svc.getEventMask("FolderSummary,LocalChangeDetected"); mask != expected {
+		t.Errorf("incorrect parsed mask %x != %x", int64(mask), int64(expected))
+	}
+
+	expected = 0
+	if mask := svc.getEventMask("WeirdEvent,something else that doens't exist"); mask != expected {
+		t.Errorf("incorrect parsed mask %x != %x", int64(mask), int64(expected))
+	}
+
+	if res := svc.getEventSub(defaultEventMask); res != defSub {
+		t.Errorf("should have returned the given default event sub")
+	}
+	if res := svc.getEventSub(diskEventMask); res != diskSub {
+		t.Errorf("should have returned the given disk event sub")
+	}
+	if res := svc.getEventSub(events.LocalIndexUpdated); res == nil || res == defSub || res == diskSub {
+		t.Errorf("should have returned a valid, non-default event sub")
+	}
+}

+ 5 - 5
cmd/syncthing/main.go

@@ -637,8 +637,8 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
 	// Event subscription for the API; must start early to catch the early
 	// events. The LocalChangeDetected event might overwhelm the event
 	// receiver in some situations so we will not subscribe to it here.
-	apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected&^events.RemoteChangeDetected), 1000)
-	diskSub := events.NewBufferedSubscription(events.Default.Subscribe(events.LocalChangeDetected|events.RemoteChangeDetected), 1000)
+	defaultSub := events.NewBufferedSubscription(events.Default.Subscribe(defaultEventMask), eventSubBufferSize)
+	diskSub := events.NewBufferedSubscription(events.Default.Subscribe(diskEventMask), eventSubBufferSize)
 
 	if len(os.Getenv("GOMAXPROCS")) == 0 {
 		runtime.GOMAXPROCS(runtime.NumCPU())
@@ -868,7 +868,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
 
 	// GUI
 
-	setupGUI(mainService, cfg, m, apiSub, diskSub, cachedDiscovery, connectionsService, errors, systemLog, runtimeOptions)
+	setupGUI(mainService, cfg, m, defaultSub, diskSub, cachedDiscovery, connectionsService, errors, systemLog, runtimeOptions)
 
 	if runtimeOptions.cpuProfile {
 		f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid()))
@@ -1086,7 +1086,7 @@ func startAuditing(mainService *suture.Supervisor, auditFile string) {
 	l.Infoln("Audit log in", auditDest)
 }
 
-func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, apiSub events.BufferedSubscription, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService *connections.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) {
+func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService *connections.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) {
 	guiCfg := cfg.GUI()
 
 	if !guiCfg.Enabled {
@@ -1097,7 +1097,7 @@ func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Mode
 		l.Warnln("Insecure admin access is enabled.")
 	}
 
-	api := newAPIService(myID, cfg, locations[locHTTPSCertFile], locations[locHTTPSKeyFile], runtimeOptions.assetDir, m, apiSub, diskSub, discoverer, connectionsService, errors, systemLog)
+	api := newAPIService(myID, cfg, locations[locHTTPSCertFile], locations[locHTTPSKeyFile], runtimeOptions.assetDir, m, defaultSub, diskSub, discoverer, connectionsService, errors, systemLog)
 	cfg.Subscribe(api)
 	mainService.Add(api)
 

+ 61 - 0
lib/events/events.go

@@ -118,6 +118,67 @@ func (t EventType) MarshalText() ([]byte, error) {
 	return []byte(t.String()), nil
 }
 
+func UnmarshalEventType(s string) EventType {
+	switch s {
+	case "Starting":
+		return Starting
+	case "StartupComplete":
+		return StartupComplete
+	case "DeviceDiscovered":
+		return DeviceDiscovered
+	case "DeviceConnected":
+		return DeviceConnected
+	case "DeviceDisconnected":
+		return DeviceDisconnected
+	case "DeviceRejected":
+		return DeviceRejected
+	case "LocalChangeDetected":
+		return LocalChangeDetected
+	case "RemoteChangeDetected":
+		return RemoteChangeDetected
+	case "LocalIndexUpdated":
+		return LocalIndexUpdated
+	case "RemoteIndexUpdated":
+		return RemoteIndexUpdated
+	case "ItemStarted":
+		return ItemStarted
+	case "ItemFinished":
+		return ItemFinished
+	case "StateChanged":
+		return StateChanged
+	case "FolderRejected":
+		return FolderRejected
+	case "ConfigSaved":
+		return ConfigSaved
+	case "DownloadProgress":
+		return DownloadProgress
+	case "RemoteDownloadProgress":
+		return RemoteDownloadProgress
+	case "FolderSummary":
+		return FolderSummary
+	case "FolderCompletion":
+		return FolderCompletion
+	case "FolderErrors":
+		return FolderErrors
+	case "DevicePaused":
+		return DevicePaused
+	case "DeviceResumed":
+		return DeviceResumed
+	case "FolderScanProgress":
+		return FolderScanProgress
+	case "FolderPaused":
+		return FolderPaused
+	case "FolderResumed":
+		return FolderResumed
+	case "ListenAddressesChanged":
+		return ListenAddressesChanged
+	case "LoginAttempt":
+		return LoginAttempt
+	default:
+		return 0
+	}
+}
+
 const BufferSize = 64
 
 type Logger struct {