Browse Source

Refactor statistics printing

Jakob Borg 12 years ago
parent
commit
6679c84cfb
4 changed files with 113 additions and 69 deletions
  1. 34 0
      main.go
  2. 35 45
      model.go
  3. 14 23
      protocol/protocol.go
  4. 30 1
      util.go

+ 34 - 0
main.go

@@ -174,9 +174,43 @@ func main() {
 		}
 	}()
 
+	// Periodically print statistics
+	go printStatsLoop(m)
+
 	select {}
 }
 
+func printStatsLoop(m *Model) {
+	var lastUpdated int64
+	var lastStats = make(map[string]protocol.Statistics)
+
+	for {
+		time.Sleep(60 * time.Second)
+
+		for node, stats := range m.ConnectionStats() {
+			secs := time.Since(lastStats[node].At).Seconds()
+			inbps := 8 * int(float64(stats.InBytesTotal-lastStats[node].InBytesTotal)/secs)
+			outbps := 8 * int(float64(stats.OutBytesTotal-lastStats[node].OutBytesTotal)/secs)
+
+			if inbps+outbps > 0 {
+				infof("%s: %sb/s in, %sb/s out", node, MetricPrefix(inbps), MetricPrefix(outbps))
+			}
+
+			lastStats[node] = stats
+		}
+
+		if lu := m.Generation(); lu > lastUpdated {
+			lastUpdated = lu
+			files, bytes := m.GlobalSize()
+			infof("%6d files, %9sB in cluster", files, BinaryPrefix(bytes))
+			files, bytes = m.LocalSize()
+			infof("%6d files, %9sB in local repo", files, BinaryPrefix(bytes))
+			files, bytes = m.NeedSize()
+			infof("%6d files, %9sB in to synchronize", files, BinaryPrefix(bytes))
+		}
+	}
+}
+
 func listen(myID string, addr string, m *Model, cfg *tls.Config) {
 	l, err := tls.Listen("tcp", addr, cfg)
 	fatalErr(err)

+ 35 - 45
model.go

@@ -60,7 +60,6 @@ func NewModel(dir string) *Model {
 		lastIdxBcast: time.Now(),
 	}
 
-	go m.printStatsLoop()
 	go m.broadcastIndexLoop()
 	return m
 }
@@ -69,62 +68,55 @@ func (m *Model) Start() {
 	go m.puller()
 }
 
-func (m *Model) printStatsLoop() {
-	var lastUpdated int64
-	for {
-		time.Sleep(60 * time.Second)
-		m.RLock()
-		m.printConnectionStats()
-		if m.updatedLocal+m.updateGlobal > lastUpdated {
-			m.printModelStats()
-			lastUpdated = m.updatedLocal + m.updateGlobal
-		}
-		m.RUnlock()
-	}
+func (m *Model) Generation() int64 {
+	m.RLock()
+	defer m.RUnlock()
+
+	return m.updatedLocal + m.updateGlobal
 }
 
-func (m *Model) printConnectionStats() {
+func (m *Model) ConnectionStats() map[string]protocol.Statistics {
+	m.RLock()
+	defer m.RUnlock()
+
+	var res = make(map[string]protocol.Statistics)
 	for node, conn := range m.nodes {
-		stats := conn.Statistics()
-		if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
-			infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
-		}
+		res[node] = conn.Statistics()
 	}
+	return res
 }
 
-func (m *Model) printModelStats() {
-	var tot int
+func (m *Model) GlobalSize() (files, bytes int) {
+	m.RLock()
+	defer m.RUnlock()
+
+	files = len(m.global)
 	for _, f := range m.global {
-		tot += f.Size()
+		bytes += f.Size()
 	}
-	infof("%6d files, %8sB in cluster", len(m.global), toSI(tot))
+	return
+}
 
-	if len(m.need) > 0 {
-		tot = 0
-		for _, f := range m.local {
-			tot += f.Size()
-		}
-		infof("%6d files, %8sB in local repo", len(m.local), toSI(tot))
+func (m *Model) LocalSize() (files, bytes int) {
+	m.RLock()
+	defer m.RUnlock()
 
-		tot = 0
-		for n := range m.need {
-			tot += m.global[n].Size()
-		}
-		infof("%6d files, %8sB to synchronize", len(m.need), toSI(tot))
+	files = len(m.local)
+	for _, f := range m.local {
+		bytes += f.Size()
 	}
+	return
 }
 
-func toSI(n int) string {
-	if n > 1<<30 {
-		return fmt.Sprintf("%.02f G", float64(n)/(1<<30))
-	}
-	if n > 1<<20 {
-		return fmt.Sprintf("%.02f M", float64(n)/(1<<20))
-	}
-	if n > 1<<10 {
-		return fmt.Sprintf("%.01f K", float64(n)/(1<<10))
+func (m *Model) NeedSize() (files, bytes int) {
+	m.RLock()
+	defer m.RUnlock()
+
+	files = len(m.need)
+	for n := range m.need {
+		bytes += m.global[n].Size()
 	}
-	return fmt.Sprintf("%d ", n)
+	return
 }
 
 // Index is called when a new node is connected and we receive their full index.
@@ -147,7 +139,6 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
 
 	m.recomputeGlobal()
 	m.recomputeNeed()
-	m.printModelStats()
 }
 
 // IndexUpdate is called for incremental updates to connected nodes' indexes.
@@ -188,7 +179,6 @@ func (m *Model) SeedIndex(fs []protocol.FileInfo) {
 
 	m.recomputeGlobal()
 	m.recomputeNeed()
-	m.printModelStats()
 }
 
 func (m *Model) Close(node string, err error) {

+ 14 - 23
protocol/protocol.go

@@ -60,7 +60,6 @@ type Connection struct {
 	hasSentIndex  bool
 	hasRecvdIndex bool
 
-	lastStatistics Statistics
 	statisticsLock sync.Mutex
 }
 
@@ -84,14 +83,13 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
 	}
 
 	c := Connection{
-		receiver:       receiver,
-		reader:         flrd,
-		mreader:        &marshalReader{r: flrd},
-		writer:         flwr,
-		mwriter:        &marshalWriter{w: flwr},
-		awaiting:       make(map[int]chan asyncResult),
-		ID:             nodeID,
-		lastStatistics: Statistics{At: time.Now()},
+		receiver: receiver,
+		reader:   flrd,
+		mreader:  &marshalReader{r: flrd},
+		writer:   flwr,
+		mwriter:  &marshalWriter{w: flwr},
+		awaiting: make(map[int]chan asyncResult),
+		ID:       nodeID,
 	}
 
 	go c.readerLoop()
@@ -373,27 +371,20 @@ func (c *Connection) pingerLoop() {
 }
 
 type Statistics struct {
-	At             time.Time
-	InBytesTotal   int
-	InBytesPerSec  int
-	OutBytesTotal  int
-	OutBytesPerSec int
+	At            time.Time
+	InBytesTotal  int
+	OutBytesTotal int
 }
 
 func (c *Connection) Statistics() Statistics {
 	c.statisticsLock.Lock()
 	defer c.statisticsLock.Unlock()
 
-	secs := time.Since(c.lastStatistics.At).Seconds()
-	rt := int(c.mreader.getTot())
-	wt := int(c.mwriter.getTot())
 	stats := Statistics{
-		At:             time.Now(),
-		InBytesTotal:   rt,
-		InBytesPerSec:  int(float64(rt-c.lastStatistics.InBytesTotal) / secs),
-		OutBytesTotal:  wt,
-		OutBytesPerSec: int(float64(wt-c.lastStatistics.OutBytesTotal) / secs),
+		At:            time.Now(),
+		InBytesTotal:  int(c.mreader.getTot()),
+		OutBytesTotal: int(c.mwriter.getTot()),
 	}
-	c.lastStatistics = stats
+
 	return stats
 }

+ 30 - 1
util.go

@@ -1,7 +1,36 @@
 package main
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
 
 func timing(name string, t0 time.Time) {
 	debugf("%s: %.02f ms", name, time.Since(t0).Seconds()*1000)
 }
+
+func MetricPrefix(n int) string {
+	if n > 1e9 {
+		return fmt.Sprintf("%.02f G", float64(n)/1e9)
+	}
+	if n > 1e6 {
+		return fmt.Sprintf("%.02f M", float64(n)/1e6)
+	}
+	if n > 1e3 {
+		return fmt.Sprintf("%.01f k", float64(n)/1e3)
+	}
+	return fmt.Sprintf("%d ", n)
+}
+
+func BinaryPrefix(n int) string {
+	if n > 1<<30 {
+		return fmt.Sprintf("%.02f Gi", float64(n)/(1<<30))
+	}
+	if n > 1<<20 {
+		return fmt.Sprintf("%.02f Mi", float64(n)/(1<<20))
+	}
+	if n > 1<<10 {
+		return fmt.Sprintf("%.01f Ki", float64(n)/(1<<10))
+	}
+	return fmt.Sprintf("%d ", n)
+}