Browse Source

Print model statistics

Jakob Borg 12 years ago
parent
commit
707e992f19
3 changed files with 85 additions and 22 deletions
  1. 1 1
      main.go
  2. 77 21
      model.go
  3. 7 0
      walk.go

+ 1 - 1
main.go

@@ -286,8 +286,8 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model,
 				}
 
 				nc := protocol.NewConnection(nodeID, conn, conn, m)
-				okln("Connected to node", remoteID, "(out)")
 				m.AddNode(nc)
+				okln("Connected to node", remoteID, "(out)")
 				continue nextNode
 			}
 		}

+ 77 - 21
model.go

@@ -25,13 +25,16 @@ import (
 
 type Model struct {
 	sync.RWMutex
-	dir     string
-	updated int64
-	global  map[string]File // the latest version of each file as it exists in the cluster
-	local   map[string]File // the files we currently have locally on disk
-	remote  map[string]map[string]File
-	need    map[string]bool // the files we need to update
-	nodes   map[string]*protocol.Connection
+	dir string
+
+	global map[string]File // the latest version of each file as it exists in the cluster
+	local  map[string]File // the files we currently have locally on disk
+	remote map[string]map[string]File
+	need   map[string]bool // the files we need to update
+	nodes  map[string]*protocol.Connection
+
+	updatedLocal int64 // timestamp of last update to local
+	updateGlobal int64 // timestamp of last update to remote
 
 	lastIdxBcast        time.Time
 	lastIdxBcastRequest time.Time
@@ -59,7 +62,7 @@ func NewModel(dir string) *Model {
 		lastIdxBcast: time.Now(),
 	}
 
-	go m.printStats()
+	go m.printStatsLoop()
 	go m.broadcastIndexLoop()
 	return m
 }
@@ -68,22 +71,53 @@ func (m *Model) Start() {
 	go m.puller()
 }
 
-func (m *Model) printStats() {
+func (m *Model) printStatsLoop() {
+	var lastUpdated int64
 	for {
 		time.Sleep(60 * time.Second)
 		m.RLock()
-		for node, conn := range m.nodes {
-			stats := conn.Statistics()
-			if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 {
-				infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000)
-			} else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
-				infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
-			}
+		m.printConnectionStats()
+		if m.updatedLocal+m.updateGlobal > lastUpdated {
+			m.printModelStats()
+			lastUpdated = m.updatedLocal + m.updateGlobal
 		}
 		m.RUnlock()
 	}
 }
 
+func (m *Model) printConnectionStats() {
+	for node, conn := range m.nodes {
+		stats := conn.Statistics()
+		if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 {
+			infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000)
+		} else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
+			infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
+		}
+	}
+}
+
+func (m *Model) printModelStats() {
+	var tot int
+	for _, f := range m.global {
+		tot += f.Size()
+	}
+	infof("%6d files, %8sB in cluster", len(m.global), toSI(tot))
+
+	if len(m.need) > 0 {
+		tot = 0
+		for _, f := range m.local {
+			tot += f.Size()
+		}
+		infof("%6d files, %8sB in local repo", len(m.local), toSI(tot))
+
+		tot = 0
+		for n := range m.need {
+			tot += m.global[n].Size()
+		}
+		infof("%6d files, %8sB to synchronize", len(m.need), toSI(tot))
+	}
+}
+
 func toSI(n int) string {
 	if n > 1<<30 {
 		return fmt.Sprintf("%.02f G", float64(n)/(1<<30))
@@ -97,6 +131,7 @@ func toSI(n int) string {
 	return fmt.Sprintf("%d ", n)
 }
 
+// Index is called when a new node is connected and we receive their full index.
 func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
 	m.Lock()
 	defer m.Unlock()
@@ -116,8 +151,10 @@ func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
 
 	m.recomputeGlobal()
 	m.recomputeNeed()
+	m.printModelStats()
 }
 
+// IndexUpdate is called for incremental updates to connected nodes' indexes.
 func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
 	m.Lock()
 	defer m.Unlock()
@@ -143,6 +180,7 @@ func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
 	m.recomputeNeed()
 }
 
+// SeedIndex is called when our previously cached index is loaded from disk at startup.
 func (m *Model) SeedIndex(fs []protocol.FileInfo) {
 	m.Lock()
 	defer m.Unlock()
@@ -154,6 +192,7 @@ func (m *Model) SeedIndex(fs []protocol.FileInfo) {
 
 	m.recomputeGlobal()
 	m.recomputeNeed()
+	m.printModelStats()
 }
 
 func (m *Model) Close(node string) {
@@ -232,7 +271,7 @@ func (m *Model) ReplaceLocal(fs []File) {
 		m.local = newLocal
 		m.recomputeGlobal()
 		m.recomputeNeed()
-		m.updated = time.Now().Unix()
+		m.updatedLocal = time.Now().Unix()
 		m.lastIdxBcastRequest = time.Now()
 	}
 }
@@ -254,7 +293,7 @@ func (m *Model) broadcastIndexLoop() {
 			for _, node := range m.nodes {
 				node := node
 				if opts.Debug.TraceNet {
-					debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
+					debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
 				}
 				go func() {
 					node.Index(idx)
@@ -300,7 +339,7 @@ func (m *Model) UpdateLocal(f File) {
 		m.local[f.Name] = f
 		m.recomputeGlobal()
 		m.recomputeNeed()
-		m.updated = time.Now().Unix()
+		m.updatedLocal = time.Now().Unix()
 		m.lastIdxBcastRequest = time.Now()
 	}
 }
@@ -351,7 +390,24 @@ func (m *Model) recomputeGlobal() {
 		}
 	}
 
-	m.global = newGlobal
+	// Figure out if anything actually changed
+
+	var updated bool
+	if len(newGlobal) != len(m.global) {
+		updated = true
+	} else {
+		for n, f0 := range newGlobal {
+			if f1, ok := m.global[n]; !ok || f0.Modified != f1.Modified {
+				updated = true
+				break
+			}
+		}
+	}
+
+	if updated {
+		m.updateGlobal = time.Now().Unix()
+		m.global = newGlobal
+	}
 }
 
 // Must be called with the write lock held.
@@ -418,7 +474,7 @@ func (m *Model) AddNode(node *protocol.Connection) {
 	m.RUnlock()
 
 	if opts.Debug.TraceNet {
-		debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
+		debugf("NET IDX(out/add): %s: %d files", node.ID, len(idx))
 	}
 
 	// Sending the index might take a while if we have many files and a slow

+ 7 - 0
walk.go

@@ -25,6 +25,13 @@ func (f File) Dump() {
 	fmt.Println()
 }
 
+func (f File) Size() (bytes int) {
+	for _, b := range f.Blocks {
+		bytes += int(b.Length)
+	}
+	return
+}
+
 func isTempName(name string) bool {
 	return strings.HasPrefix(path.Base(name), ".syncthing.")
 }