Преглед изворни кода

New file change suppression algorithm (fixes #30)

Jakob Borg пре 12 година
родитељ
комит
1dde9ec2d8
7 измењених фајлова са 208 додато и 70 уклоњено
  1. 1 1
      integration/test.sh
  2. 2 1
      main.go
  3. 13 41
      model/model.go
  4. 0 25
      model/model_test.go
  5. 72 0
      model/suppressor.go
  6. 113 0
      model/suppressor_test.go
  7. 7 2
      model/walk.go

+ 1 - 1
integration/test.sh

@@ -30,7 +30,7 @@ EOT
 
 	mkdir files-$i
 	pushd files-$i >/dev/null
-	../genfiles -maxexp 21 -files 4000
+	../genfiles -maxexp 21 -files 400
 	../md5r > ../md5-$i
 	popd >/dev/null
 done

+ 2 - 1
main.go

@@ -58,6 +58,7 @@ type AdvancedOptions struct {
 	LimitRate        int           `long:"send-rate" description:"Rate limit for outgoing data" default:"0" value-name:"KBPS"`
 	ScanInterval     time.Duration `long:"scan-intv" description:"Repository scan interval" default:"60s" value-name:"INTV"`
 	ConnInterval     time.Duration `long:"conn-intv" description:"Node reconnect interval" default:"60s" value-name:"INTV"`
+	MaxChangeBW      int           `long:"max-change-bw" description:"Max change bandwidth per file" default:"1e6" value-name:"MB/s"`
 }
 
 var opts Options
@@ -166,7 +167,7 @@ func main() {
 	}
 
 	ensureDir(dir, -1)
-	m := model.NewModel(dir)
+	m := model.NewModel(dir, opts.Advanced.MaxChangeBW)
 	for _, t := range opts.Debug.TraceModel {
 		m.Trace(t)
 	}

+ 13 - 41
model/model.go

@@ -46,9 +46,7 @@ type Model struct {
 
 	trace map[string]bool
 
-	fileLastChanged   map[string]time.Time
-	fileWasSuppressed map[string]int
-	fmut              sync.Mutex // protects fileLastChanged and fileWasSuppressed
+	sup suppressor
 
 	parallellRequests int
 	limitRequestRate  chan struct{}
@@ -79,20 +77,19 @@ var (
 // NewModel creates and starts a new model. The model starts in read-only mode,
 // where it sends index information to connected peers and responds to requests
 // for file data without altering the local repository in any way.
-func NewModel(dir string) *Model {
+func NewModel(dir string, maxChangeBw int) *Model {
 	m := &Model{
-		dir:               dir,
-		global:            make(map[string]File),
-		local:             make(map[string]File),
-		remote:            make(map[string]map[string]File),
-		protoConn:         make(map[string]Connection),
-		rawConn:           make(map[string]io.Closer),
-		lastIdxBcast:      time.Now(),
-		trace:             make(map[string]bool),
-		fileLastChanged:   make(map[string]time.Time),
-		fileWasSuppressed: make(map[string]int),
-		fq:                NewFileQueue(),
-		dq:                make(chan File),
+		dir:          dir,
+		global:       make(map[string]File),
+		local:        make(map[string]File),
+		remote:       make(map[string]map[string]File),
+		protoConn:    make(map[string]Connection),
+		rawConn:      make(map[string]io.Closer),
+		lastIdxBcast: time.Now(),
+		trace:        make(map[string]bool),
+		sup:          suppressor{threshold: int64(maxChangeBw)},
+		fq:           NewFileQueue(),
+		dq:           make(chan File),
 	}
 
 	go m.broadcastIndexLoop()
@@ -391,7 +388,6 @@ func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []b
 }
 
 // ReplaceLocal replaces the local repository index with the given list of files.
-// Change suppression is applied to files changing too often.
 func (m *Model) ReplaceLocal(fs []File) {
 	var updated bool
 	var newLocal = make(map[string]File)
@@ -512,30 +508,6 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
 	}
 }
 
-func (m *Model) shouldSuppressChange(name string) bool {
-	m.fmut.Lock()
-	sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
-	if sup {
-		m.fileWasSuppressed[name]++
-	} else {
-		m.fileWasSuppressed[name] = 0
-		m.fileLastChanged[name] = time.Now()
-	}
-	m.fmut.Unlock()
-	return sup
-}
-
-func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
-	sinceLast := time.Since(lastChange)
-	if sinceLast > maxFileHoldTimeS*time.Second {
-		return false
-	}
-	if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
-		return true
-	}
-	return false
-}
-
 // ProtocolIndex returns the current local index in protocol data types.
 // Must be called with the read lock held.
 func (m *Model) ProtocolIndex() []protocol.FileInfo {

+ 0 - 25
model/model_test.go

@@ -356,31 +356,6 @@ func TestRequest(t *testing.T) {
 	}
 }
 
-func TestSuppression(t *testing.T) {
-	var testdata = []struct {
-		lastChange time.Time
-		hold       int
-		result     bool
-	}{
-		{time.Unix(0, 0), 0, false},                    // First change
-		{time.Now().Add(-1 * time.Second), 0, true},    // Changed once one second ago, suppress
-		{time.Now().Add(-119 * time.Second), 0, true},  // Changed once 119 seconds ago, suppress
-		{time.Now().Add(-121 * time.Second), 0, false}, // Changed once 121 seconds ago, permit
-
-		{time.Now().Add(-179 * time.Second), 1, true},  // Suppressed once 179 seconds ago, suppress again
-		{time.Now().Add(-181 * time.Second), 1, false}, // Suppressed once 181 seconds ago, permit
-
-		{time.Now().Add(-599 * time.Second), 99, true},  // Suppressed lots of times, last allowed 599 seconds ago, suppress again
-		{time.Now().Add(-601 * time.Second), 99, false}, // Suppressed lots of times, last allowed 601 seconds ago, permit
-	}
-
-	for i, tc := range testdata {
-		if shouldSuppressChange(tc.lastChange, tc.hold) != tc.result {
-			t.Errorf("Incorrect result for test #%d: %v", i, tc)
-		}
-	}
-}
-
 func TestIgnoreWithUnknownFlags(t *testing.T) {
 	m := NewModel("testdata")
 	fs, _ := m.Walk(false)

+ 72 - 0
model/suppressor.go

@@ -0,0 +1,72 @@
+package model
+
+import (
+	"sync"
+	"time"
+)
+
+const (
+	MAX_CHANGE_HISTORY = 4
+)
+
+type change struct {
+	size int64
+	when time.Time
+}
+
+type changeHistory struct {
+	changes []change
+	next    int64
+	prevSup bool
+}
+
+type suppressor struct {
+	sync.Mutex
+	changes   map[string]changeHistory
+	threshold int64 // bytes/s
+}
+
+func (h changeHistory) bandwidth(t time.Time) int64 {
+	if len(h.changes) == 0 {
+		return 0
+	}
+
+	var t0 = h.changes[0].when
+	if t == t0 {
+		return 0
+	}
+
+	var bw float64
+	for _, c := range h.changes {
+		bw += float64(c.size)
+	}
+	return int64(bw / t.Sub(t0).Seconds())
+}
+
+func (h *changeHistory) append(size int64, t time.Time) {
+	c := change{size, t}
+	if len(h.changes) == MAX_CHANGE_HISTORY {
+		h.changes = h.changes[1:MAX_CHANGE_HISTORY]
+	}
+	h.changes = append(h.changes, c)
+}
+
+func (s *suppressor) suppress(name string, size int64, t time.Time) (bool, bool) {
+	s.Lock()
+
+	if s.changes == nil {
+		s.changes = make(map[string]changeHistory)
+	}
+	h := s.changes[name]
+	sup := h.bandwidth(t) > s.threshold
+	prevSup := h.prevSup
+	h.prevSup = sup
+	if !sup {
+		h.append(size, t)
+	}
+	s.changes[name] = h
+
+	s.Unlock()
+
+	return sup, prevSup
+}

+ 113 - 0
model/suppressor_test.go

@@ -0,0 +1,113 @@
+package model
+
+import (
+	"testing"
+	"time"
+)
+
+func TestSuppressor(t *testing.T) {
+	s := suppressor{threshold: 10000}
+	t0 := time.Now()
+
+	t1 := t0
+	sup, prev := s.suppress("foo", 10000, t1)
+	if sup {
+		t.Fatal("Never suppress first change")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is 10000 / 10 = 1000
+	t1 = t0.Add(10 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 1000 {
+		t.Error("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 10000, t1)
+	if sup {
+		t.Fatal("Should still be fine")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is (10000 + 10000) / 11 = 1818
+	t1 = t0.Add(11 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 1818 {
+		t.Error("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 100500, t1)
+	if sup {
+		t.Fatal("Should still be fine")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is (10000 + 10000 + 100500) / 12 = 10041
+	t1 = t0.Add(12 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 10041 {
+		t.Error("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 10000000, t1) // value will be ignored
+	if !sup {
+		t.Fatal("Should be over threshold")
+	}
+	if prev {
+		t.Fatal("Incorrect prev status")
+	}
+
+	// bw is (10000 + 10000 + 100500) / 15 = 8033
+	t1 = t0.Add(15 * time.Second)
+	if bw := s.changes["foo"].bandwidth(t1); bw != 8033 {
+		t.Error("Incorrect bw %d", bw)
+	}
+	sup, prev = s.suppress("foo", 10000000, t1)
+	if sup {
+		t.Fatal("Should be Ok")
+	}
+	if !prev {
+		t.Fatal("Incorrect prev status")
+	}
+}
+
+func TestHistory(t *testing.T) {
+	h := changeHistory{}
+
+	t0 := time.Now()
+	h.append(40, t0)
+
+	if l := len(h.changes); l != 1 {
+		t.Errorf("Incorrect history length %d", l)
+	}
+	if s := h.changes[0].size; s != 40 {
+		t.Errorf("Incorrect first record size %d", s)
+	}
+
+	for i := 1; i < MAX_CHANGE_HISTORY; i++ {
+		h.append(int64(40+i), t0.Add(time.Duration(i)*time.Second))
+	}
+
+	if l := len(h.changes); l != MAX_CHANGE_HISTORY {
+		t.Errorf("Incorrect history length %d", l)
+	}
+	if s := h.changes[0].size; s != 40 {
+		t.Errorf("Incorrect first record size %d", s)
+	}
+	if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 40+MAX_CHANGE_HISTORY-1 {
+		t.Errorf("Incorrect last record size %d", s)
+	}
+
+	h.append(999, t0.Add(time.Duration(999)*time.Second))
+
+	if l := len(h.changes); l != MAX_CHANGE_HISTORY {
+		t.Errorf("Incorrect history length %d", l)
+	}
+	if s := h.changes[0].size; s != 41 {
+		t.Errorf("Incorrect first record size %d", s)
+	}
+	if s := h.changes[MAX_CHANGE_HISTORY-1].size; s != 999 {
+		t.Errorf("Incorrect last record size %d", s)
+	}
+
+}

+ 7 - 2
model/walk.go

@@ -126,9 +126,12 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.
 				}
 				*res = append(*res, lf)
 			} else {
-				if m.shouldSuppressChange(rn) {
+				if cur, prev := m.sup.suppress(rn, info.Size(), time.Now()); cur {
 					if m.trace["file"] {
-						log.Println("FILE: SUPPRESS:", rn, m.fileWasSuppressed[rn], time.Since(m.fileLastChanged[rn]))
+						log.Printf("FILE: SUPPRESS: %q change bw over threshold", rn)
+					}
+					if !prev {
+						log.Printf("INFO: Changes to %q are being temporarily suppressed because it changes too frequently.", rn)
 					}
 
 					if ok {
@@ -137,6 +140,8 @@ func (m *Model) walkAndHashFiles(res *[]File, ign map[string][]string) filepath.
 						*res = append(*res, lf)
 					}
 					return nil
+				} else if prev && !cur {
+					log.Printf("INFO: Changes to %q are no longer suppressed.", rn)
 				}
 
 				if m.trace["file"] {