Browse Source

Rate limit sent data, not uncompressed

Jakob Borg 11 years ago
parent
commit
a1f32095df
3 changed files with 36 additions and 20 deletions
  1. 19 0
      cmd/syncthing/limitedwriter.go
  2. 17 9
      cmd/syncthing/main.go
  3. 0 11
      cmd/syncthing/model.go

+ 19 - 0
cmd/syncthing/limitedwriter.go

@@ -0,0 +1,19 @@
+package main
+
+import (
+	"io"
+
+	"github.com/juju/ratelimit"
+)
+
+type limitedWriter struct {
+	w      io.Writer
+	bucket *ratelimit.Bucket
+}
+
+func (w *limitedWriter) Write(buf []byte) (int, error) {
+	if w.bucket != nil {
+		w.bucket.Wait(int64(len(buf)))
+	}
+	return w.w.Write(buf)
+}

+ 17 - 9
cmd/syncthing/main.go

@@ -4,6 +4,7 @@ import (
 	"crypto/tls"
 	"flag"
 	"fmt"
+	"io"
 	"log"
 	"net"
 	"net/http"
@@ -19,6 +20,7 @@ import (
 	"github.com/calmh/ini"
 	"github.com/calmh/syncthing/discover"
 	"github.com/calmh/syncthing/protocol"
+	"github.com/juju/ratelimit"
 )
 
 const BlockSize = 128 * 1024
@@ -27,12 +29,9 @@ var cfg Configuration
 var Version = "unknown-dev"
 
 var (
-	myID string
-)
-
-var (
-	showVersion bool
-	confDir     string
+	myID       string
+	confDir    string
+	rateBucket *ratelimit.Bucket
 )
 
 const (
@@ -60,6 +59,7 @@ const (
 )
 
 func main() {
+	var showVersion bool
 	flag.StringVar(&confDir, "home", getDefaultConfDir(), "Set configuration directory")
 	flag.BoolVar(&showVersion, "version", false, "Show version")
 	flag.Usage = usageFor(flag.CommandLine, usage, extraUsage)
@@ -186,11 +186,15 @@ func main() {
 		MinVersion:             tls.VersionTLS12,
 	}
 
-	m := NewModel(cfg.Options.MaxChangeKbps * 1000)
+	// If the write rate should be limited, set up a rate limiter for it.
+	// This will be used on connections created in the connect and listen routines.
+
 	if cfg.Options.MaxSendKbps > 0 {
-		m.LimitRate(cfg.Options.MaxSendKbps)
+		rateBucket = ratelimit.NewBucketWithRate(float64(1000*cfg.Options.MaxSendKbps), int64(5*1000*cfg.Options.MaxSendKbps))
 	}
 
+	m := NewModel(cfg.Options.MaxChangeKbps * 1000)
+
 	for i := range cfg.Repositories {
 		cfg.Repositories[i].Nodes = cleanNodeList(cfg.Repositories[i].Nodes, myID)
 		dir := expandTilde(cfg.Repositories[i].Directory)
@@ -415,7 +419,11 @@ next:
 
 		for _, nodeCfg := range cfg.Repositories[0].Nodes {
 			if nodeCfg.NodeID == remoteID {
-				protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
+				var wr io.Writer = conn
+				if rateBucket != nil {
+					wr = &limitedWriter{conn, rateBucket}
+				}
+				protoConn := protocol.NewConnection(remoteID, conn, wr, m, connOpts)
 				m.AddConnection(conn, protoConn)
 				continue next
 			}

+ 0 - 11
cmd/syncthing/model.go

@@ -18,7 +18,6 @@ import (
 	"github.com/calmh/syncthing/lamport"
 	"github.com/calmh/syncthing/protocol"
 	"github.com/calmh/syncthing/scanner"
-	"github.com/juju/ratelimit"
 )
 
 type Model struct {
@@ -36,8 +35,6 @@ type Model struct {
 
 	sup suppressor
 
-	limitRequestRate *ratelimit.Bucket
-
 	addedRepo bool
 	started   bool
 }
@@ -66,10 +63,6 @@ func NewModel(maxChangeBw int) *Model {
 	return m
 }
 
-func (m *Model) LimitRate(kbps int) {
-	m.limitRequestRate = ratelimit.NewBucketWithRate(float64(kbps), int64(5*kbps))
-}
-
 // StartRW starts read/write processing on the current model. When in
 // read/write mode the model will attempt to keep in sync with the cluster by
 // pulling needed files from peer nodes.
@@ -351,10 +344,6 @@ func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]by
 		return nil, err
 	}
 
-	if m.limitRequestRate != nil {
-		m.limitRequestRate.Wait(int64(size / 1024))
-	}
-
 	return buf, nil
 }