|
|
@@ -5,6 +5,7 @@ import (
|
|
|
"errors"
|
|
|
"io"
|
|
|
"sync"
|
|
|
+ "time"
|
|
|
|
|
|
"github.com/calmh/syncthing/buffers"
|
|
|
)
|
|
|
@@ -40,17 +41,19 @@ type Model interface {
|
|
|
}
|
|
|
|
|
|
type Connection struct {
|
|
|
- receiver Model
|
|
|
- reader io.Reader
|
|
|
- mreader *marshalReader
|
|
|
- writer io.Writer
|
|
|
- mwriter *marshalWriter
|
|
|
- wLock sync.RWMutex
|
|
|
- closed bool
|
|
|
- closedLock sync.RWMutex
|
|
|
- awaiting map[int]chan asyncResult
|
|
|
- nextId int
|
|
|
- ID string
|
|
|
+ ID string
|
|
|
+ receiver Model
|
|
|
+ reader io.Reader
|
|
|
+ mreader *marshalReader
|
|
|
+ writer io.Writer
|
|
|
+ mwriter *marshalWriter
|
|
|
+ wLock sync.RWMutex
|
|
|
+ closed bool
|
|
|
+ closedLock sync.RWMutex
|
|
|
+ awaiting map[int]chan asyncResult
|
|
|
+ nextId int
|
|
|
+ lastReceive time.Time
|
|
|
+ peerLatency time.Duration
|
|
|
}
|
|
|
|
|
|
var ErrClosed = errors.New("Connection closed")
|
|
|
@@ -60,6 +63,9 @@ type asyncResult struct {
|
|
|
err error
|
|
|
}
|
|
|
|
|
|
+const pingTimeout = 30 * time.Second
|
|
|
+const pingIdleTime = 5 * time.Minute
|
|
|
+
|
|
|
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection {
|
|
|
flrd := flate.NewReader(reader)
|
|
|
flwr, err := flate.NewWriter(writer, flate.BestSpeed)
|
|
|
@@ -78,6 +84,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
|
|
|
}
|
|
|
|
|
|
go c.readerLoop()
|
|
|
+ go c.pingerLoop()
|
|
|
|
|
|
return &c
|
|
|
}
|
|
|
@@ -166,6 +173,10 @@ func (c *Connection) readerLoop() {
|
|
|
break
|
|
|
}
|
|
|
|
|
|
+ c.wLock.Lock()
|
|
|
+ c.lastReceive = time.Now()
|
|
|
+ c.wLock.Unlock()
|
|
|
+
|
|
|
switch hdr.msgType {
|
|
|
case messageTypeIndex:
|
|
|
files := c.mreader.readIndex()
|
|
|
@@ -237,3 +248,29 @@ func (c *Connection) processRequest(msgID int) {
|
|
|
}()
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func (c *Connection) pingerLoop() {
|
|
|
+ var rc = make(chan time.Duration)
|
|
|
+ for !c.isClosed() {
|
|
|
+ c.wLock.RLock()
|
|
|
+ lr := c.lastReceive
|
|
|
+ c.wLock.RUnlock()
|
|
|
+
|
|
|
+ if time.Since(lr) > pingIdleTime {
|
|
|
+ go func() {
|
|
|
+ t0 := time.Now()
|
|
|
+ c.Ping()
|
|
|
+ rc <- time.Since(t0)
|
|
|
+ }()
|
|
|
+ select {
|
|
|
+ case lat := <-rc:
|
|
|
+ c.wLock.Lock()
|
|
|
+ c.peerLatency = (c.peerLatency + lat) / 2
|
|
|
+ c.wLock.Unlock()
|
|
|
+ case <-time.After(pingTimeout):
|
|
|
+ c.close()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ time.Sleep(time.Second)
|
|
|
+ }
|
|
|
+}
|