Explorar o código

cmd/syncthing, lib/events, lib/sync: Add timeout to REST event API, remove Ping (fixes #3933)

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3941
Antony Male %!s(int64=8) %!d(string=hai) anos
pai
achega
ac510b26e2

+ 3 - 3
cmd/syncthing/auditservice_test.go

@@ -20,13 +20,13 @@ func TestAuditService(t *testing.T) {
 	service := newAuditService(buf)
 
 	// Event sent before start, will not be logged
-	events.Default.Log(events.Ping, "the first event")
+	events.Default.Log(events.ConfigSaved, "the first event")
 
 	go service.Serve()
 	service.WaitForStart()
 
 	// Event that should end up in the audit log
-	events.Default.Log(events.Ping, "the second event")
+	events.Default.Log(events.ConfigSaved, "the second event")
 
 	// We need to give the events time to arrive, since the channels are buffered etc.
 	time.Sleep(10 * time.Millisecond)
@@ -35,7 +35,7 @@ func TestAuditService(t *testing.T) {
 	service.WaitForStop()
 
 	// This event should not be logged, since we have stopped.
-	events.Default.Log(events.Ping, "the third event")
+	events.Default.Log(events.ConfigSaved, "the third event")
 
 	result := string(buf.Bytes())
 	t.Log(result)

+ 10 - 3
cmd/syncthing/gui.go

@@ -233,8 +233,8 @@ func (s *apiService) Serve() {
 	getRestMux.HandleFunc("/rest/db/need", s.getDBNeed)                          // folder [perpage] [page]
 	getRestMux.HandleFunc("/rest/db/status", s.getDBStatus)                      // folder
 	getRestMux.HandleFunc("/rest/db/browse", s.getDBBrowse)                      // folder [prefix] [dirsonly] [levels]
-	getRestMux.HandleFunc("/rest/events", s.getIndexEvents)                      // since [limit]
-	getRestMux.HandleFunc("/rest/events/disk", s.getDiskEvents)                  // since [limit]
+	getRestMux.HandleFunc("/rest/events", s.getIndexEvents)                      // since [limit] [timeout]
+	getRestMux.HandleFunc("/rest/events/disk", s.getDiskEvents)                  // since [limit] [timeout]
 	getRestMux.HandleFunc("/rest/stats/device", s.getDeviceStats)                // -
 	getRestMux.HandleFunc("/rest/stats/folder", s.getFolderStats)                // -
 	getRestMux.HandleFunc("/rest/svc/deviceid", s.getDeviceID)                   // id
@@ -1019,9 +1019,15 @@ func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub
 	qs := r.URL.Query()
 	sinceStr := qs.Get("since")
 	limitStr := qs.Get("limit")
+	timeoutStr := qs.Get("timeout")
 	since, _ := strconv.Atoi(sinceStr)
 	limit, _ := strconv.Atoi(limitStr)
 
+	timeout := defaultEventTimeout
+	if timeoutSec, timeoutErr := strconv.Atoi(timeoutStr); timeoutErr == nil && timeoutSec >= 0 { // 0 is a valid timeout
+		timeout = time.Duration(timeoutSec) * time.Second
+	}
+
 	// Flush before blocking, to indicate that we've received the request and
 	// that it should not be retried. Must set Content-Type header before
 	// flushing.
@@ -1029,7 +1035,8 @@ func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub
 	f := w.(http.Flusher)
 	f.Flush()
 
-	evs := eventSub.Since(since, nil)
+	// If there are no events available return an empty slice, as this gets serialized as `[]`
+	evs := eventSub.Since(since, []events.Event{}, timeout)
 	if 0 < limit && limit < len(evs) {
 		evs = evs[len(evs)-limit:]
 	}

+ 2 - 10
cmd/syncthing/main.go

@@ -79,7 +79,7 @@ const (
 	tlsDefaultCommonName = "syncthing"
 	httpsRSABits         = 2048
 	bepRSABits           = 0 // 384 bit ECDSA used instead
-	pingEventInterval    = time.Minute
+	defaultEventTimeout  = time.Minute
 	maxSystemErrors      = 5
 	initialSystemLog     = 10
 	maxSystemLog         = 250
@@ -594,7 +594,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
 	// events. The LocalChangeDetected event might overwhelm the event
 	// receiver in some situations so we will not subscribe to it here.
 	apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected&^events.RemoteChangeDetected), 1000)
-	diskSub := events.NewBufferedSubscription(events.Default.Subscribe(events.LocalChangeDetected|events.RemoteChangeDetected|events.Ping), 1000)
+	diskSub := events.NewBufferedSubscription(events.Default.Subscribe(events.LocalChangeDetected|events.RemoteChangeDetected), 1000)
 
 	if len(os.Getenv("GOMAXPROCS")) == 0 {
 		runtime.GOMAXPROCS(runtime.NumCPU())
@@ -877,7 +877,6 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
 	events.Default.Log(events.StartupComplete, map[string]string{
 		"myID": myID.String(),
 	})
-	go generatePingEvents()
 
 	cleanConfigDirectory()
 
@@ -1091,13 +1090,6 @@ func defaultConfig(myName string) config.Configuration {
 	return newCfg
 }
 
-func generatePingEvents() {
-	for {
-		time.Sleep(pingEventInterval)
-		events.Default.Log(events.Ping, nil)
-	}
-}
-
 func resetDB() error {
 	return os.RemoveAll(locations[locDatabase])
 }

+ 6 - 2
cmd/syncthing/mocked_events_test.go

@@ -6,10 +6,14 @@
 
 package main
 
-import "github.com/syncthing/syncthing/lib/events"
+import (
+	"time"
+
+	"github.com/syncthing/syncthing/lib/events"
+)
 
 type mockedEventSub struct{}
 
-func (s *mockedEventSub) Since(id int, into []events.Event) []events.Event {
+func (s *mockedEventSub) Since(id int, into []events.Event, timeout time.Duration) []events.Event {
 	select {}
 }

+ 1 - 1
cmd/syncthing/summaryservice.go

@@ -169,7 +169,7 @@ func (c *folderSummaryService) foldersToHandle() []string {
 	c.lastEventReqMut.Lock()
 	last := c.lastEventReq
 	c.lastEventReqMut.Unlock()
-	if time.Since(last) > pingEventInterval {
+	if time.Since(last) > defaultEventTimeout {
 		return nil
 	}
 

+ 1 - 1
cmd/syncthing/verboseservice.go

@@ -66,7 +66,7 @@ func (s *verboseService) WaitForStart() {
 
 func (s *verboseService) formatEvent(ev events.Event) string {
 	switch ev.Type {
-	case events.Ping, events.DownloadProgress, events.LocalIndexUpdated:
+	case events.DownloadProgress, events.LocalIndexUpdated:
 		// Skip
 		return ""
 

+ 0 - 1
gui/default/syncthing/core/eventService.js

@@ -70,7 +70,6 @@ angular.module('syncthing.core')
             ITEM_FINISHED:        'ItemFinished',   // Generated when Syncthing ends synchronizing a file to a newer version
             ITEM_STARTED:         'ItemStarted',   // Generated when Syncthing begins synchronizing a file to a newer version
             LOCAL_INDEX_UPDATED:  'LocalIndexUpdated',   // Generated when the local index information has changed, due to synchronizing one or more items from the cluster or discovering local changes during a scan
-            PING:                 'Ping',   // Generated automatically every 60 seconds
             REMOTE_INDEX_UPDATED: 'RemoteIndexUpdated',   // Generated each time new index information is received from a device
             STARTING:             'Starting',   // Emitted exactly once, when Syncthing starts, before parsing configuration etc
             STARTUP_COMPLETED:    'StartupCompleted',   // Emitted exactly once, when initialization is complete and Syncthing is ready to start exchanging data with other devices

+ 16 - 11
lib/events/events.go

@@ -10,7 +10,6 @@ package events
 import (
 	"errors"
 	"runtime"
-	stdsync "sync"
 	"time"
 
 	"github.com/syncthing/syncthing/lib/sync"
@@ -19,8 +18,7 @@ import (
 type EventType int
 
 const (
-	Ping EventType = 1 << iota
-	Starting
+	Starting EventType = 1 << iota
 	StartupComplete
 	DeviceDiscovered
 	DeviceConnected
@@ -55,8 +53,6 @@ var runningTests = false
 
 func (t EventType) String() string {
 	switch t {
-	case Ping:
-		return "Ping"
 	case Starting:
 		return "Starting"
 	case StartupComplete:
@@ -279,11 +275,11 @@ type bufferedSubscription struct {
 	next int
 	cur  int // Current SubscriptionID
 	mut  sync.Mutex
-	cond *stdsync.Cond
+	cond *sync.TimeoutCond
 }
 
 type BufferedSubscription interface {
-	Since(id int, into []Event) []Event
+	Since(id int, into []Event, timeout time.Duration) []Event
 }
 
 func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
@@ -292,7 +288,7 @@ func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
 		buf: make([]Event, size),
 		mut: sync.NewMutex(),
 	}
-	bs.cond = stdsync.NewCond(bs.mut)
+	bs.cond = sync.NewTimeoutCond(bs.mut)
 	go bs.pollingLoop()
 	return bs
 }
@@ -319,12 +315,21 @@ func (s *bufferedSubscription) pollingLoop() {
 	}
 }
 
-func (s *bufferedSubscription) Since(id int, into []Event) []Event {
+func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
 	s.mut.Lock()
 	defer s.mut.Unlock()
 
-	for id >= s.cur {
-		s.cond.Wait()
+	// Check once first before generating the TimeoutCondWaiter
+	if id >= s.cur {
+		waiter := s.cond.SetupWait(timeout)
+		defer waiter.Stop()
+
+		for id >= s.cur {
+			if eventsAvailable := waiter.Wait(); !eventsAvailable {
+				// Timed out
+				return into
+			}
+		}
 	}
 
 	for i := s.next; i < len(s.buf); i++ {

+ 4 - 4
lib/events/events_test.go

@@ -219,7 +219,7 @@ func TestBufferedSub(t *testing.T) {
 
 	recv := 0
 	for recv < 10*BufferSize {
-		evs := bs.Since(recv, nil)
+		evs := bs.Since(recv, nil, time.Minute)
 		for _, ev := range evs {
 			if ev.GlobalID != recv+1 {
 				t.Fatalf("Incorrect ID; %d != %d", ev.GlobalID, recv+1)
@@ -252,7 +252,7 @@ func BenchmarkBufferedSub(b *testing.B) {
 		recv := 0
 		var evs []Event
 		for i := 0; i < b.N; {
-			evs = bs.Since(recv, evs[:0])
+			evs = bs.Since(recv, evs[:0], time.Minute)
 			for _, ev := range evs {
 				if ev.GlobalID != recv+1 {
 					done <- fmt.Errorf("skipped event %v %v", ev.GlobalID, recv)
@@ -299,7 +299,7 @@ func TestSinceUsesSubscriptionId(t *testing.T) {
 	// delivered to the buffered subscription when we get here.
 	t0 := time.Now()
 	for time.Since(t0) < time.Second {
-		events := bs.Since(0, nil)
+		events := bs.Since(0, nil, time.Minute)
 		if len(events) == 2 {
 			break
 		}
@@ -308,7 +308,7 @@ func TestSinceUsesSubscriptionId(t *testing.T) {
 		}
 	}
 
-	events := bs.Since(1, nil)
+	events := bs.Since(1, nil, time.Minute)
 	if len(events) != 1 {
 		t.Fatal("Incorrect number of events:", len(events))
 	}

+ 67 - 0
lib/sync/sync.go

@@ -227,3 +227,70 @@ func goid() int {
 	}
 	return id
 }
+
+// TimeoutCond is a variant on Cond. It has roughly the same semantics regarding 'L' - it must be held
+// both when broadcasting and when calling TimeoutCondWaiter.Wait()
+// Call Broadcast() to broadcast to all waiters on the TimeoutCond. Call SetupWait to create a
+// TimeoutCondWaiter configured with the given timeout, which can then be used to listen for
+// broadcasts.
+type TimeoutCond struct {
+	L  sync.Locker
+	ch chan struct{}
+}
+
+// TimeoutCondWaiter is a type allowing a consumer to wait on a TimeoutCond with a timeout. Wait() may be called multiple times,
+// and will return true every time that the TimeoutCond is broadcast to. Once the configured timeout
+// expires, Wait() will return false.
+// Call Stop() to release resources once this TimeoutCondWaiter is no longer needed.
+type TimeoutCondWaiter struct {
+	c     *TimeoutCond
+	timer *time.Timer
+}
+
+func NewTimeoutCond(l sync.Locker) *TimeoutCond {
+	return &TimeoutCond{
+		L: l,
+	}
+}
+
+func (c *TimeoutCond) Broadcast() {
+	// ch.L must be locked when calling this function
+
+	if c.ch != nil {
+		close(c.ch)
+		c.ch = nil
+	}
+}
+
+func (c *TimeoutCond) SetupWait(timeout time.Duration) *TimeoutCondWaiter {
+	timer := time.NewTimer(timeout)
+
+	return &TimeoutCondWaiter{
+		c:     c,
+		timer: timer,
+	}
+}
+
+func (w *TimeoutCondWaiter) Wait() bool {
+	// ch.L must be locked when calling this function
+
+	// Ensure that the channel exists, since we're going to be waiting on it
+	if w.c.ch == nil {
+		w.c.ch = make(chan struct{})
+	}
+	ch := w.c.ch
+
+	w.c.L.Unlock()
+	defer w.c.L.Lock()
+
+	select {
+	case <-w.timer.C:
+		return false
+	case <-ch:
+		return true
+	}
+}
+
+func (w *TimeoutCondWaiter) Stop() {
+	w.timer.Stop()
+}

+ 97 - 0
lib/sync/sync_test.go

@@ -226,3 +226,100 @@ func TestWaitGroup(t *testing.T) {
 	debug = false
 	l.SetDebug("sync", false)
 }
+
+func TestTimeoutCond(t *testing.T) {
+	// WARNING this test relies heavily on threads not being stalled at particular points.
+	// As such, it's pretty unstable on the build server. It has been left in as it still
+	// exercises the deadlock detector, and one of the two things it tests is still functional.
+	// See the comments in runLocks
+
+	const (
+		// Low values to avoid being intrusive in continous testing. Can be
+		// increased significantly for stress testing.
+		iterations = 100
+		routines   = 10
+
+		timeMult = 2
+	)
+
+	c := NewTimeoutCond(NewMutex())
+
+	// Start a routine to periodically broadcast on the cond.
+
+	go func() {
+		d := time.Duration(routines) * timeMult * time.Millisecond / 2
+		t.Log("Broadcasting every", d)
+		for i := 0; i < iterations; i++ {
+			time.Sleep(d)
+
+			c.L.Lock()
+			c.Broadcast()
+			c.L.Unlock()
+		}
+	}()
+
+	// Start several routines that wait on it with different timeouts.
+
+	var results [routines][2]int
+	var wg sync.WaitGroup
+	for i := 0; i < routines; i++ {
+		i := i
+		wg.Add(1)
+		go func() {
+			d := time.Duration(i) * timeMult * time.Millisecond
+			t.Logf("Routine %d waits for %v\n", i, d)
+			succ, fail := runLocks(t, iterations, c, d)
+			results[i][0] = succ
+			results[i][1] = fail
+			wg.Done()
+		}()
+	}
+
+	wg.Wait()
+
+	// Print a table of routine number: successes, failures.
+
+	for i, v := range results {
+		t.Logf("%4d: %4d %4d\n", i, v[0], v[1])
+	}
+}
+
+func runLocks(t *testing.T, iterations int, c *TimeoutCond, d time.Duration) (succ, fail int) {
+	for i := 0; i < iterations; i++ {
+		c.L.Lock()
+
+		// The thread may be stalled, so we can't test the 'succeeded late' case reliably.
+		// Therefore make sure that we start t0 before starting the timeout, and only test
+		// the 'failed early' case.
+
+		t0 := time.Now()
+		w := c.SetupWait(d)
+
+		res := w.Wait()
+		waited := time.Since(t0)
+
+		// Allow 20% slide in either direction, and a five milliseconds of
+		// scheduling delay... In tweaking these it was clear that things
+		// worked like the should, so if this becomes a spurious failure
+		// kind of thing feel free to remove or give significantly more
+		// slack.
+
+		if !res && waited < d*8/10 {
+			t.Errorf("Wait failed early, %v < %v", waited, d)
+		}
+		if res && waited > d*11/10+5*time.Millisecond {
+			// Ideally this would be t.Errorf
+			t.Logf("WARNING: Wait succeeded late, %v > %v. This is probably a thread scheduling issue", waited, d)
+		}
+
+		w.Stop()
+
+		if res {
+			succ++
+		} else {
+			fail++
+		}
+		c.L.Unlock()
+	}
+	return
+}