Browse Source

Report transfer stats

Jakob Borg 12 years ago
parent
commit
746d52930d
4 changed files with 81 additions and 23 deletions
  1. 0 1
      discover/discover.go
  2. 2 2
      main.go
  3. 31 0
      model.go
  4. 48 20
      protocol/protocol.go

+ 0 - 1
discover/discover.go

@@ -229,7 +229,6 @@ func (d *Discoverer) externalLookup(node string) (string, bool) {
 		log.Printf("discover/external: %v; no external lookup", err)
 		return "", false
 	}
-	log.Println("query", extIP)
 
 	go func() {
 		var buf = make([]byte, 1024)

+ 2 - 2
main.go

@@ -162,7 +162,7 @@ func main() {
 	// XXX: Should use some fsnotify mechanism.
 	go func() {
 		for {
-			time.Sleep(time.Duration(opts.ScanInterval) * time.Second)
+			time.Sleep(opts.ScanInterval)
 			updateLocalModel(m)
 		}
 	}()
@@ -291,7 +291,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
 			}
 		}
 
-		time.Sleep(time.Duration(opts.ConnInterval) * time.Second)
+		time.Sleep(opts.ConnInterval)
 	}
 }
 

+ 31 - 0
model.go

@@ -14,6 +14,7 @@ TODO(jb): Keep global and per node transfer and performance statistics.
 */
 
 import (
+	"fmt"
 	"os"
 	"path"
 	"sync"
@@ -49,6 +50,7 @@ func NewModel(dir string) *Model {
 		nodes:  make(map[string]*protocol.Connection),
 	}
 
+	go m.printStats()
 	return m
 }
 
@@ -56,6 +58,35 @@ func (m *Model) Start() {
 	go m.puller()
 }
 
+func (m *Model) printStats() {
+	for {
+		time.Sleep(15 * time.Second)
+		m.RLock()
+		for node, conn := range m.nodes {
+			stats := conn.Statistics()
+			if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 {
+				infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000)
+			} else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
+				infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
+			}
+		}
+		m.RUnlock()
+	}
+}
+
+func toSI(n int) string {
+	if n > 1<<30 {
+		return fmt.Sprintf("%.02f G", float64(n)/(1<<30))
+	}
+	if n > 1<<20 {
+		return fmt.Sprintf("%.02f M", float64(n)/(1<<20))
+	}
+	if n > 1<<10 {
+		return fmt.Sprintf("%.01f K", float64(n)/(1<<10))
+	}
+	return fmt.Sprintf("%d ", n)
+}
+
 func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
 	m.Lock()
 	defer m.Unlock()

+ 48 - 20
protocol/protocol.go

@@ -42,18 +42,19 @@ type Model interface {
 }
 
 type Connection struct {
-	ID          string
-	receiver    Model
-	reader      io.Reader
-	mreader     *marshalReader
-	writer      io.Writer
-	mwriter     *marshalWriter
-	wLock       sync.RWMutex
-	closed      bool
-	awaiting    map[int]chan asyncResult
-	nextId      int
-	lastReceive time.Time
-	peerLatency time.Duration
+	ID             string
+	receiver       Model
+	reader         io.Reader
+	mreader        *marshalReader
+	writer         io.Writer
+	mwriter        *marshalWriter
+	wLock          sync.RWMutex
+	closed         bool
+	awaiting       map[int]chan asyncResult
+	nextId         int
+	lastReceive    time.Time
+	peerLatency    time.Duration
+	lastStatistics Statistics
 }
 
 var ErrClosed = errors.New("Connection closed")
@@ -74,14 +75,15 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
 	}
 
 	c := Connection{
-		receiver:    receiver,
-		reader:      flrd,
-		mreader:     &marshalReader{flrd, 0, nil},
-		writer:      flwr,
-		mwriter:     &marshalWriter{flwr, 0, nil},
-		awaiting:    make(map[int]chan asyncResult),
-		lastReceive: time.Now(),
-		ID:          nodeID,
+		receiver:       receiver,
+		reader:         flrd,
+		mreader:        &marshalReader{flrd, 0, nil},
+		writer:         flwr,
+		mwriter:        &marshalWriter{flwr, 0, nil},
+		awaiting:       make(map[int]chan asyncResult),
+		lastReceive:    time.Now(),
+		ID:             nodeID,
+		lastStatistics: Statistics{At: time.Now()},
 	}
 
 	go c.readerLoop()
@@ -313,3 +315,29 @@ func (c *Connection) pingerLoop() {
 		time.Sleep(time.Second)
 	}
 }
+
+type Statistics struct {
+	At             time.Time
+	InBytesTotal   int
+	InBytesPerSec  int
+	OutBytesTotal  int
+	OutBytesPerSec int
+	Latency        time.Duration
+}
+
+func (c *Connection) Statistics() Statistics {
+	c.wLock.Lock()
+	defer c.wLock.Unlock()
+
+	secs := time.Since(c.lastStatistics.At).Seconds()
+	stats := Statistics{
+		At:             time.Now(),
+		InBytesTotal:   c.mreader.tot,
+		InBytesPerSec:  int(float64(c.mreader.tot-c.lastStatistics.InBytesTotal) / secs),
+		OutBytesTotal:  c.mwriter.tot,
+		OutBytesPerSec: int(float64(c.mwriter.tot-c.lastStatistics.OutBytesTotal) / secs),
+		Latency:        c.peerLatency,
+	}
+	c.lastStatistics = stats
+	return stats
+}