Pārlūkot izejas kodu

Basic events interface

Jakob Borg 11 gadi atpakaļ
vecāks
revīzija
b0f46beffb
7 mainītis faili ar 487 papildinājumiem un 0 dzēšanām
  1. 54 0
      cmd/stevents/main.go
  2. 13 0
      cmd/syncthing/gui.go
  3. 16 0
      cmd/syncthing/main.go
  4. 203 0
      events/events.go
  5. 174 0
      events/events_test.go
  6. 16 0
      model/model.go
  7. 11 0
      model/puller.go

+ 54 - 0
cmd/stevents/main.go

@@ -0,0 +1,54 @@
+package main
+
+import (
+	"encoding/json"
+	"flag"
+	"fmt"
+	"log"
+	"net/http"
+	"time"
+)
+
+type event struct {
+	ID   int
+	Type string
+	Time time.Time
+	Data map[string]interface{}
+}
+
+func main() {
+	target := flag.String("target", "localhost:8080", "Target Syncthing instance")
+	apikey := flag.String("apikey", "", "Syncthing API key")
+	flag.Parse()
+
+	if *apikey == "" {
+		log.Fatal("Must give -apikey argument")
+	}
+
+	since := 0
+	for {
+		req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/rest/events?since=%d", *target, since), nil)
+		if err != nil {
+			log.Fatal(err)
+		}
+		req.Header.Set("X-API-Key", *apikey)
+		res, err := http.DefaultClient.Do(req)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		var events []event
+		err = json.NewDecoder(res.Body).Decode(&events)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		for _, event := range events {
+			log.Printf("%d: %v", event.ID, event.Type)
+			for k, v := range event.Data {
+				log.Printf("\t%s: %v", k, v)
+			}
+			since = event.ID
+		}
+	}
+}

+ 13 - 0
cmd/syncthing/gui.go

@@ -18,6 +18,7 @@ import (
 	"path/filepath"
 	"reflect"
 	"runtime"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -26,6 +27,7 @@ import (
 	"code.google.com/p/go.crypto/bcrypt"
 	"github.com/calmh/syncthing/auto"
 	"github.com/calmh/syncthing/config"
+	"github.com/calmh/syncthing/events"
 	"github.com/calmh/syncthing/logger"
 	"github.com/calmh/syncthing/model"
 	"github.com/vitrun/qart/qr"
@@ -43,6 +45,7 @@ var (
 	static       func(http.ResponseWriter, *http.Request, *log.Logger)
 	apiKey       string
 	modt         = time.Now().UTC().Format(http.TimeFormat)
+	eventSub     = events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents), 1000)
 )
 
 const (
@@ -98,6 +101,7 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro
 	getRestMux.HandleFunc("/rest/errors", restGetErrors)
 	getRestMux.HandleFunc("/rest/discovery", restGetDiscovery)
 	getRestMux.HandleFunc("/rest/report", withModel(m, restGetReport))
+	getRestMux.HandleFunc("/rest/events", restGetEvents)
 
 	// The POST handlers
 	postRestMux := http.NewServeMux()
@@ -414,6 +418,15 @@ func restGetReport(m *model.Model, w http.ResponseWriter, r *http.Request) {
 	json.NewEncoder(w).Encode(reportData(m))
 }
 
+func restGetEvents(w http.ResponseWriter, r *http.Request) {
+	qs := r.URL.Query()
+	ts := qs.Get("since")
+	since, _ := strconv.Atoi(ts)
+
+	w.Header().Set("Content-Type", "application/json; charset=utf-8")
+	json.NewEncoder(w).Encode(eventSub.Since(since, nil))
+}
+
 func getQR(w http.ResponseWriter, r *http.Request) {
 	r.ParseForm()
 	text := r.FormValue("text")

+ 16 - 0
cmd/syncthing/main.go

@@ -28,6 +28,7 @@ import (
 
 	"github.com/calmh/syncthing/config"
 	"github.com/calmh/syncthing/discover"
+	"github.com/calmh/syncthing/events"
 	"github.com/calmh/syncthing/logger"
 	"github.com/calmh/syncthing/model"
 	"github.com/calmh/syncthing/osutil"
@@ -454,10 +455,21 @@ nextRepo:
 		}()
 	}
 
+	events.Default.Log(events.StartupComplete, nil)
+	go generateEvents()
+
 	<-stop
+
 	l.Okln("Exiting")
 }
 
+func generateEvents() {
+	for {
+		time.Sleep(300 * time.Second)
+		events.Default.Log(events.Ping, nil)
+	}
+}
+
 func waitForParentExit() {
 	l.Infoln("Waiting for parent to exit...")
 	// Wait for the listen address to become free, indicating that the parent has exited.
@@ -723,6 +735,10 @@ next:
 				protoConn := protocol.NewConnection(remoteID, conn, wr, m)
 
 				l.Infof("Established secure connection to %s at %v", remoteID, conn.RemoteAddr())
+				events.Default.Log(events.NodeConnected, map[string]string{
+					"id":   remoteID.String(),
+					"addr": conn.RemoteAddr().String(),
+				})
 
 				m.AddConnection(conn, protoConn)
 				continue next

+ 203 - 0
events/events.go

@@ -0,0 +1,203 @@
+// Package events provides event subscription and polling functionality.
+package events
+
+import (
+	"errors"
+	"sync"
+	"time"
+)
+
+type EventType uint64
+
+const (
+	Ping = 1 << iota
+	StartupComplete
+	NodeConnected
+	NodeDisconnected
+	LocalIndexUpdated
+	RemoteIndexUpdated
+	ItemStarted
+	ItemCompleted
+
+	AllEvents = ^EventType(0)
+)
+
+func (t EventType) String() string {
+	switch t {
+	case Ping:
+		return "Ping"
+	case StartupComplete:
+		return "StartupComplete"
+	case NodeConnected:
+		return "NodeConnected"
+	case NodeDisconnected:
+		return "NodeDisconnected"
+	case LocalIndexUpdated:
+		return "LocalIndexUpdated"
+	case RemoteIndexUpdated:
+		return "RemoteIndexUpdated"
+	case ItemStarted:
+		return "ItemStarted"
+	default:
+		return "Unknown"
+	}
+}
+
+func (t EventType) MarshalText() ([]byte, error) {
+	return []byte(t.String()), nil
+}
+
+const BufferSize = 64
+
+type Logger struct {
+	subs   map[int]*Subscription
+	nextId int
+	mutex  sync.Mutex
+}
+
+type Event struct {
+	ID   int         `json:"id"`
+	Time time.Time   `json:"time"`
+	Type EventType   `json:"type"`
+	Data interface{} `json:"data"`
+}
+
+type Subscription struct {
+	mask   EventType
+	id     int
+	events chan Event
+	mutex  sync.Mutex
+}
+
+var Default = NewLogger()
+
+var (
+	ErrTimeout = errors.New("timeout")
+	ErrClosed  = errors.New("closed")
+)
+
+func NewLogger() *Logger {
+	return &Logger{
+		subs: make(map[int]*Subscription),
+	}
+}
+
+func (l *Logger) Log(t EventType, data interface{}) {
+	l.mutex.Lock()
+	e := Event{
+		ID:   l.nextId,
+		Time: time.Now(),
+		Type: t,
+		Data: data,
+	}
+	l.nextId++
+	for _, s := range l.subs {
+		if s.mask&t != 0 {
+			select {
+			case s.events <- e:
+			default:
+				//log.Println("Dropping event:", e)
+			}
+		}
+	}
+	l.mutex.Unlock()
+}
+
+func (l *Logger) Subscribe(mask EventType) *Subscription {
+	l.mutex.Lock()
+	s := &Subscription{
+		mask:   mask,
+		id:     l.nextId,
+		events: make(chan Event, BufferSize),
+	}
+	l.nextId++
+	l.subs[s.id] = s
+	l.mutex.Unlock()
+	return s
+}
+
+func (l *Logger) Unsubscribe(s *Subscription) {
+	l.mutex.Lock()
+	delete(l.subs, s.id)
+	close(s.events)
+	l.mutex.Unlock()
+}
+
+func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
+	s.mutex.Lock()
+	defer s.mutex.Unlock()
+
+	to := time.After(timeout)
+	select {
+	case e, ok := <-s.events:
+		if !ok {
+			return e, ErrClosed
+		}
+		return e, nil
+	case <-to:
+		return Event{}, ErrTimeout
+	}
+}
+
+type BufferedSubscription struct {
+	sub  *Subscription
+	buf  []Event
+	next int
+	cur  int
+	mut  sync.Mutex
+	cond *sync.Cond
+}
+
+func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
+	bs := &BufferedSubscription{
+		sub: s,
+		buf: make([]Event, size),
+	}
+	bs.cond = sync.NewCond(&bs.mut)
+	go bs.pollingLoop()
+	return bs
+}
+
+func (s *BufferedSubscription) pollingLoop() {
+	for {
+		ev, err := s.sub.Poll(60 * time.Second)
+		if err == ErrTimeout {
+			continue
+		}
+		if err == ErrClosed {
+			return
+		}
+		if err != nil {
+			panic("unexpected error: " + err.Error())
+		}
+
+		s.mut.Lock()
+		s.buf[s.next] = ev
+		s.next = (s.next + 1) % len(s.buf)
+		s.cur = ev.ID
+		s.cond.Broadcast()
+		s.mut.Unlock()
+	}
+}
+
+func (s *BufferedSubscription) Since(id int, into []Event) []Event {
+	s.mut.Lock()
+	defer s.mut.Unlock()
+
+	for id >= s.cur {
+		s.cond.Wait()
+	}
+
+	for i := s.next; i < len(s.buf); i++ {
+		if s.buf[i].ID > id {
+			into = append(into, s.buf[i])
+		}
+	}
+	for i := 0; i < s.next; i++ {
+		if s.buf[i].ID > id {
+			into = append(into, s.buf[i])
+		}
+	}
+
+	return into
+}

+ 174 - 0
events/events_test.go

@@ -0,0 +1,174 @@
+package events_test
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/calmh/syncthing/events"
+)
+
+var timeout = 100 * time.Millisecond
+
+func TestNewLogger(t *testing.T) {
+	l := events.NewLogger()
+	if l == nil {
+		t.Fatal("Unexpected nil Logger")
+	}
+}
+
+func TestSubscriber(t *testing.T) {
+	l := events.NewLogger()
+	s := l.Subscribe(0)
+	if s == nil {
+		t.Fatal("Unexpected nil Subscription")
+	}
+}
+
+func TestTimeout(t *testing.T) {
+	l := events.NewLogger()
+	s := l.Subscribe(0)
+	_, err := s.Poll(timeout)
+	if err != events.ErrTimeout {
+		t.Fatal("Unexpected non-Timeout error:", err)
+	}
+}
+
+func TestEventBeforeSubscribe(t *testing.T) {
+	l := events.NewLogger()
+
+	l.Log(events.NodeConnected, "foo")
+	s := l.Subscribe(0)
+
+	_, err := s.Poll(timeout)
+	if err != events.ErrTimeout {
+		t.Fatal("Unexpected non-Timeout error:", err)
+	}
+}
+
+func TestEventAfterSubscribe(t *testing.T) {
+	l := events.NewLogger()
+
+	s := l.Subscribe(events.AllEvents)
+	l.Log(events.NodeConnected, "foo")
+
+	ev, err := s.Poll(timeout)
+
+	if err != nil {
+		t.Fatal("Unexpected error:", err)
+	}
+	if ev.Type != events.NodeConnected {
+		t.Error("Incorrect event type", ev.Type)
+	}
+	switch v := ev.Data.(type) {
+	case string:
+		if v != "foo" {
+			t.Error("Incorrect Data string", v)
+		}
+	default:
+		t.Errorf("Incorrect Data type %#v", v)
+	}
+}
+
+func TestEventAfterSubscribeIgnoreMask(t *testing.T) {
+	l := events.NewLogger()
+
+	s := l.Subscribe(events.NodeDisconnected)
+	l.Log(events.NodeConnected, "foo")
+
+	_, err := s.Poll(timeout)
+	if err != events.ErrTimeout {
+		t.Fatal("Unexpected non-Timeout error:", err)
+	}
+}
+
+func TestBufferOverflow(t *testing.T) {
+	l := events.NewLogger()
+
+	_ = l.Subscribe(events.AllEvents)
+
+	t0 := time.Now()
+	for i := 0; i < events.BufferSize*2; i++ {
+		l.Log(events.NodeConnected, "foo")
+	}
+	if time.Since(t0) > timeout {
+		t.Fatalf("Logging took too long")
+	}
+}
+
+func TestUnsubscribe(t *testing.T) {
+	l := events.NewLogger()
+
+	s := l.Subscribe(events.AllEvents)
+	l.Log(events.NodeConnected, "foo")
+
+	_, err := s.Poll(timeout)
+	if err != nil {
+		t.Fatal("Unexpected error:", err)
+	}
+
+	l.Unsubscribe(s)
+	l.Log(events.NodeConnected, "foo")
+
+	_, err = s.Poll(timeout)
+	if err != events.ErrClosed {
+		t.Fatal("Unexpected non-Closed error:", err)
+	}
+}
+
+func TestIDs(t *testing.T) {
+	l := events.NewLogger()
+
+	s := l.Subscribe(events.AllEvents)
+	l.Log(events.NodeConnected, "foo")
+	l.Log(events.NodeConnected, "bar")
+
+	ev, err := s.Poll(timeout)
+	if err != nil {
+		t.Fatal("Unexpected error:", err)
+	}
+	if ev.Data.(string) != "foo" {
+		t.Fatal("Incorrect event:", ev)
+	}
+	id := ev.ID
+
+	ev, err = s.Poll(timeout)
+	if err != nil {
+		t.Fatal("Unexpected error:", err)
+	}
+	if ev.Data.(string) != "bar" {
+		t.Fatal("Incorrect event:", ev)
+	}
+	if !(ev.ID > id) {
+		t.Fatalf("ID not incremented (%d !> %d)", ev.ID, id)
+	}
+}
+
+func TestBufferedSub(t *testing.T) {
+	l := events.NewLogger()
+
+	s := l.Subscribe(events.AllEvents)
+	bs := events.NewBufferedSubscription(s, 10*events.BufferSize)
+
+	go func() {
+		for i := 0; i < 10*events.BufferSize; i++ {
+			l.Log(events.NodeConnected, fmt.Sprintf("event-%d", i))
+			if i%30 == 0 {
+				// Give the buffer routine time to pick up the events
+				time.Sleep(20 * time.Millisecond)
+			}
+		}
+	}()
+
+	recv := 0
+	for recv < 10*events.BufferSize {
+		evs := bs.Since(recv, nil)
+		for _, ev := range evs {
+			if ev.ID != recv+1 {
+				t.Fatalf("Incorrect ID; %d != %d", ev.ID, recv+1)
+			}
+			recv = ev.ID
+		}
+	}
+
+}

+ 16 - 0
model/model.go

@@ -18,6 +18,7 @@ import (
 	"time"
 
 	"github.com/calmh/syncthing/config"
+	"github.com/calmh/syncthing/events"
 	"github.com/calmh/syncthing/files"
 	"github.com/calmh/syncthing/lamport"
 	"github.com/calmh/syncthing/osutil"
@@ -315,6 +316,11 @@ func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInf
 		l.Fatalf("Index for nonexistant repo %q", repo)
 	}
 	m.rmut.RUnlock()
+
+	events.Default.Log(events.RemoteIndexUpdated, map[string]string{
+		"node": nodeID.String(),
+		"repo": repo,
+	})
 }
 
 // IndexUpdate is called for incremental updates to connected nodes' indexes.
@@ -336,6 +342,11 @@ func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.F
 		l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
 	}
 	m.rmut.RUnlock()
+
+	events.Default.Log(events.RemoteIndexUpdated, map[string]string{
+		"node": nodeID.String(),
+		"repo": repo,
+	})
 }
 
 func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
@@ -376,6 +387,10 @@ func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterCon
 // Implements the protocol.Model interface.
 func (m *Model) Close(node protocol.NodeID, err error) {
 	l.Infof("Connection to %s closed: %v", node, err)
+	events.Default.Log(events.NodeDisconnected, map[string]string{
+		"id":    node.String(),
+		"error": err.Error(),
+	})
 
 	m.rmut.RLock()
 	for _, repo := range m.nodeRepos[node] {
@@ -541,6 +556,7 @@ func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
 	m.rmut.RLock()
 	m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
 	m.rmut.RUnlock()
+	events.Default.Log(events.LocalIndexUpdated, map[string]string{"repo": repo})
 }
 
 func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {

+ 11 - 0
model/puller.go

@@ -13,6 +13,7 @@ import (
 	"time"
 
 	"github.com/calmh/syncthing/config"
+	"github.com/calmh/syncthing/events"
 	"github.com/calmh/syncthing/osutil"
 	"github.com/calmh/syncthing/protocol"
 	"github.com/calmh/syncthing/scanner"
@@ -395,6 +396,11 @@ func (p *puller) handleBlock(b bqBlock) bool {
 			}
 		}
 
+		events.Default.Log(events.ItemStarted, map[string]string{
+			"repo": p.repoCfg.ID,
+			"item": f.Name,
+		})
+
 		p.model.updateLocal(p.repoCfg.ID, f)
 		return true
 	}
@@ -407,6 +413,11 @@ func (p *puller) handleBlock(b bqBlock) bool {
 			l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name)
 		}
 
+		events.Default.Log(events.ItemStarted, map[string]string{
+			"repo": p.repoCfg.ID,
+			"item": f.Name,
+		})
+
 		of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name)
 		of.filepath = filepath.Join(p.repoCfg.Directory, f.Name)
 		of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))