|
@@ -28,7 +28,6 @@ const (
|
|
|
messageTypeRequest = 2
|
|
|
messageTypeResponse = 3
|
|
|
messageTypePing = 4
|
|
|
- messageTypePong = 5
|
|
|
messageTypeIndexUpdate = 6
|
|
|
messageTypeClose = 7
|
|
|
)
|
|
@@ -71,13 +70,12 @@ const (
|
|
|
)
|
|
|
|
|
|
var (
|
|
|
- ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
|
|
|
- ErrClosed = errors.New("connection closed")
|
|
|
+ ErrClosed = errors.New("connection closed")
|
|
|
+ ErrTimeout = errors.New("read timeout")
|
|
|
)
|
|
|
|
|
|
// Specific variants of empty messages...
|
|
|
type pingMessage struct{ EmptyMessage }
|
|
|
-type pongMessage struct{ EmptyMessage }
|
|
|
|
|
|
type Model interface {
|
|
|
// An index was received from the peer device
|
|
@@ -146,9 +144,11 @@ type isEofer interface {
|
|
|
IsEOF() bool
|
|
|
}
|
|
|
|
|
|
-var (
|
|
|
- PingTimeout = 30 * time.Second
|
|
|
- PingIdleTime = 60 * time.Second
|
|
|
+const (
|
|
|
+ // We make sure to send a message at least this often, by triggering pings.
|
|
|
+ PingSendInterval = 90 * time.Second
|
|
|
+ // If we haven't received a message from the other side for this long, close the connection.
|
|
|
+ ReceiveTimeout = 300 * time.Second
|
|
|
)
|
|
|
|
|
|
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
|
|
@@ -180,7 +180,8 @@ func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiv
|
|
|
func (c *rawConnection) Start() {
|
|
|
go c.readerLoop()
|
|
|
go c.writerLoop()
|
|
|
- go c.pingerLoop()
|
|
|
+ go c.pingSender()
|
|
|
+ go c.pingReceiver()
|
|
|
go c.idGenerator()
|
|
|
}
|
|
|
|
|
@@ -278,18 +279,7 @@ func (c *rawConnection) ping() bool {
|
|
|
return false
|
|
|
}
|
|
|
|
|
|
- rc := make(chan asyncResult, 1)
|
|
|
- c.awaitingMut.Lock()
|
|
|
- c.awaiting[id] = rc
|
|
|
- c.awaitingMut.Unlock()
|
|
|
-
|
|
|
- ok := c.send(id, messageTypePing, nil, nil)
|
|
|
- if !ok {
|
|
|
- return false
|
|
|
- }
|
|
|
-
|
|
|
- res, ok := <-rc
|
|
|
- return ok && res.err == nil
|
|
|
+ return c.send(id, messageTypePing, nil, nil)
|
|
|
}
|
|
|
|
|
|
func (c *rawConnection) readerLoop() (err error) {
|
|
@@ -352,13 +342,7 @@ func (c *rawConnection) readerLoop() (err error) {
|
|
|
if state != stateReady {
|
|
|
return fmt.Errorf("protocol error: ping message in state %d", state)
|
|
|
}
|
|
|
- c.send(hdr.msgID, messageTypePong, pongMessage{}, nil)
|
|
|
-
|
|
|
- case pongMessage:
|
|
|
- if state != stateReady {
|
|
|
- return fmt.Errorf("protocol error: pong message in state %d", state)
|
|
|
- }
|
|
|
- c.handlePong(hdr.msgID)
|
|
|
+ // Nothing
|
|
|
|
|
|
case CloseMessage:
|
|
|
return errors.New(msg.Reason)
|
|
@@ -467,9 +451,6 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
|
|
|
case messageTypePing:
|
|
|
msg = pingMessage{}
|
|
|
|
|
|
- case messageTypePong:
|
|
|
- msg = pongMessage{}
|
|
|
-
|
|
|
case messageTypeClusterConfig:
|
|
|
var cc ClusterConfigMessage
|
|
|
err = cc.UnmarshalXDR(msgBuf)
|
|
@@ -729,42 +710,55 @@ func (c *rawConnection) idGenerator() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (c *rawConnection) pingerLoop() {
|
|
|
- var rc = make(chan bool, 1)
|
|
|
- ticker := time.Tick(PingIdleTime / 2)
|
|
|
+// The pingSender makes sure that we've sent a message within the last
|
|
|
+// PingSendInterval. If we already have something sent in the last
|
|
|
+// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
|
|
|
+// results in an effecting ping interval of somewhere between
|
|
|
+// PingSendInterval/2 and PingSendInterval.
|
|
|
+func (c *rawConnection) pingSender() {
|
|
|
+ ticker := time.Tick(PingSendInterval / 2)
|
|
|
+
|
|
|
for {
|
|
|
select {
|
|
|
case <-ticker:
|
|
|
- if d := time.Since(c.cr.Last()); d < PingIdleTime {
|
|
|
- if debug {
|
|
|
- l.Debugln(c.id, "ping skipped after rd", d)
|
|
|
- }
|
|
|
- continue
|
|
|
- }
|
|
|
- if d := time.Since(c.cw.Last()); d < PingIdleTime {
|
|
|
+ d := time.Since(c.cw.Last())
|
|
|
+ if d < PingSendInterval/2 {
|
|
|
if debug {
|
|
|
l.Debugln(c.id, "ping skipped after wr", d)
|
|
|
}
|
|
|
continue
|
|
|
}
|
|
|
- go func() {
|
|
|
- if debug {
|
|
|
- l.Debugln(c.id, "ping ->")
|
|
|
- }
|
|
|
- rc <- c.ping()
|
|
|
- }()
|
|
|
- select {
|
|
|
- case ok := <-rc:
|
|
|
+
|
|
|
+ if debug {
|
|
|
+ l.Debugln(c.id, "ping -> after", d)
|
|
|
+ }
|
|
|
+ c.ping()
|
|
|
+
|
|
|
+ case <-c.closed:
|
|
|
+ return
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// The pingReciever checks that we've received a message (any message will do,
|
|
|
+// but we expect pings in the absence of other messages) within the last
|
|
|
+// ReceiveTimeout. If not, we close the connection with an ErrTimeout.
|
|
|
+func (c *rawConnection) pingReceiver() {
|
|
|
+ ticker := time.Tick(ReceiveTimeout / 2)
|
|
|
+
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ticker:
|
|
|
+ d := time.Since(c.cr.Last())
|
|
|
+ if d > ReceiveTimeout {
|
|
|
if debug {
|
|
|
- l.Debugln(c.id, "<- pong")
|
|
|
- }
|
|
|
- if !ok {
|
|
|
- c.close(fmt.Errorf("ping failure"))
|
|
|
+ l.Debugln(c.id, "ping timeout", d)
|
|
|
}
|
|
|
- case <-time.After(PingTimeout):
|
|
|
- c.close(fmt.Errorf("ping timeout"))
|
|
|
- case <-c.closed:
|
|
|
- return
|
|
|
+ c.close(ErrTimeout)
|
|
|
+ }
|
|
|
+
|
|
|
+ if debug {
|
|
|
+ l.Debugln(c.id, "last read within", d)
|
|
|
}
|
|
|
|
|
|
case <-c.closed:
|