protocol.go 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773
  1. // Copyright (C) 2014 The Protocol Authors.
  2. package protocol
  3. import (
  4. "encoding/binary"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. lz4 "github.com/bkaradzic/go-lz4"
  12. "github.com/calmh/xdr"
  13. )
  14. const (
  15. // BlockSize is the standard ata block size (128 KiB)
  16. BlockSize = 128 << 10
  17. // MaxMessageLen is the largest message size allowed on the wire. (512 MiB)
  18. MaxMessageLen = 64 << 23
  19. )
  20. const (
  21. messageTypeClusterConfig = 0
  22. messageTypeIndex = 1
  23. messageTypeRequest = 2
  24. messageTypeResponse = 3
  25. messageTypePing = 4
  26. messageTypeIndexUpdate = 6
  27. messageTypeClose = 7
  28. )
  29. const (
  30. stateInitial = iota
  31. stateReady
  32. )
  33. // FileInfo flags
  34. const (
  35. FlagDeleted uint32 = 1 << 12 // bit 19 in MSB order with the first bit being #0
  36. FlagInvalid = 1 << 13 // bit 18
  37. FlagDirectory = 1 << 14 // bit 17
  38. FlagNoPermBits = 1 << 15 // bit 16
  39. FlagSymlink = 1 << 16 // bit 15
  40. FlagSymlinkMissingTarget = 1 << 17 // bit 14
  41. FlagsAll = (1 << 18) - 1
  42. SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
  43. )
  44. // IndexMessage message flags (for IndexUpdate)
  45. const (
  46. FlagIndexTemporary uint32 = 1 << iota
  47. )
  48. // Request message flags
  49. const (
  50. FlagRequestTemporary uint32 = 1 << iota
  51. )
  52. // ClusterConfigMessage.Folders flags
  53. const (
  54. FlagFolderReadOnly uint32 = 1 << 0
  55. FlagFolderIgnorePerms = 1 << 1
  56. FlagFolderIgnoreDelete = 1 << 2
  57. FlagFolderAll = 1<<3 - 1
  58. )
  59. // ClusterConfigMessage.Folders.Devices flags
  60. const (
  61. FlagShareTrusted uint32 = 1 << 0
  62. FlagShareReadOnly = 1 << 1
  63. FlagIntroducer = 1 << 2
  64. FlagShareBits = 0x000000ff
  65. )
  66. var (
  67. ErrClosed = errors.New("connection closed")
  68. ErrTimeout = errors.New("read timeout")
  69. )
  70. // Specific variants of empty messages...
  71. type pingMessage struct{ EmptyMessage }
  72. type Model interface {
  73. // An index was received from the peer device
  74. Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
  75. // An index update was received from the peer device
  76. IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
  77. // A request was made by the peer device
  78. Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
  79. // A cluster configuration message was received
  80. ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
  81. // The peer device closed the connection
  82. Close(deviceID DeviceID, err error)
  83. }
  84. type Connection interface {
  85. Start()
  86. ID() DeviceID
  87. Name() string
  88. Index(folder string, files []FileInfo, flags uint32, options []Option) error
  89. IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
  90. Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
  91. ClusterConfig(config ClusterConfigMessage)
  92. Statistics() Statistics
  93. Closed() bool
  94. }
  95. type rawConnection struct {
  96. id DeviceID
  97. name string
  98. receiver Model
  99. cr *countingReader
  100. cw *countingWriter
  101. awaiting [4096]chan asyncResult
  102. awaitingMut sync.Mutex
  103. idxMut sync.Mutex // ensures serialization of Index calls
  104. nextID chan int
  105. outbox chan hdrMsg
  106. closed chan struct{}
  107. once sync.Once
  108. pool sync.Pool
  109. compression Compression
  110. readerBuf []byte // used & reused by readMessage
  111. }
  112. type asyncResult struct {
  113. val []byte
  114. err error
  115. }
  116. type hdrMsg struct {
  117. hdr header
  118. msg encodable
  119. done chan struct{}
  120. }
  121. type encodable interface {
  122. MarshalXDRInto(m *xdr.Marshaller) error
  123. XDRSize() int
  124. }
  125. type isEofer interface {
  126. IsEOF() bool
  127. }
  128. const (
  129. // PingSendInterval is how often we make sure to send a message, by
  130. // triggering pings if necessary.
  131. PingSendInterval = 90 * time.Second
  132. // ReceiveTimeout is the longest we'll wait for a message from the other
  133. // side before closing the connection.
  134. ReceiveTimeout = 300 * time.Second
  135. )
  136. func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
  137. cr := &countingReader{Reader: reader}
  138. cw := &countingWriter{Writer: writer}
  139. c := rawConnection{
  140. id: deviceID,
  141. name: name,
  142. receiver: nativeModel{receiver},
  143. cr: cr,
  144. cw: cw,
  145. outbox: make(chan hdrMsg),
  146. nextID: make(chan int),
  147. closed: make(chan struct{}),
  148. pool: sync.Pool{
  149. New: func() interface{} {
  150. return make([]byte, BlockSize)
  151. },
  152. },
  153. compression: compress,
  154. }
  155. return wireFormatConnection{&c}
  156. }
  157. // Start creates the goroutines for sending and receiving of messages. It must
  158. // be called exactly once after creating a connection.
  159. func (c *rawConnection) Start() {
  160. go c.readerLoop()
  161. go c.writerLoop()
  162. go c.pingSender()
  163. go c.pingReceiver()
  164. go c.idGenerator()
  165. }
  166. func (c *rawConnection) ID() DeviceID {
  167. return c.id
  168. }
  169. func (c *rawConnection) Name() string {
  170. return c.name
  171. }
  172. // Index writes the list of file information to the connected peer device
  173. func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, options []Option) error {
  174. select {
  175. case <-c.closed:
  176. return ErrClosed
  177. default:
  178. }
  179. c.idxMut.Lock()
  180. c.send(-1, messageTypeIndex, IndexMessage{
  181. Folder: folder,
  182. Files: idx,
  183. Flags: flags,
  184. Options: options,
  185. }, nil)
  186. c.idxMut.Unlock()
  187. return nil
  188. }
  189. // IndexUpdate writes the list of file information to the connected peer device as an update
  190. func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, options []Option) error {
  191. select {
  192. case <-c.closed:
  193. return ErrClosed
  194. default:
  195. }
  196. c.idxMut.Lock()
  197. c.send(-1, messageTypeIndexUpdate, IndexMessage{
  198. Folder: folder,
  199. Files: idx,
  200. Flags: flags,
  201. Options: options,
  202. }, nil)
  203. c.idxMut.Unlock()
  204. return nil
  205. }
  206. // Request returns the bytes for the specified block after fetching them from the connected peer.
  207. func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
  208. var id int
  209. select {
  210. case id = <-c.nextID:
  211. case <-c.closed:
  212. return nil, ErrClosed
  213. }
  214. c.awaitingMut.Lock()
  215. if ch := c.awaiting[id]; ch != nil {
  216. panic("id taken")
  217. }
  218. rc := make(chan asyncResult, 1)
  219. c.awaiting[id] = rc
  220. c.awaitingMut.Unlock()
  221. ok := c.send(id, messageTypeRequest, RequestMessage{
  222. Folder: folder,
  223. Name: name,
  224. Offset: offset,
  225. Size: int32(size),
  226. Hash: hash,
  227. Flags: flags,
  228. Options: options,
  229. }, nil)
  230. if !ok {
  231. return nil, ErrClosed
  232. }
  233. res, ok := <-rc
  234. if !ok {
  235. return nil, ErrClosed
  236. }
  237. return res.val, res.err
  238. }
  239. // ClusterConfig send the cluster configuration message to the peer and returns any error
  240. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  241. c.send(-1, messageTypeClusterConfig, config, nil)
  242. }
  243. func (c *rawConnection) Closed() bool {
  244. select {
  245. case <-c.closed:
  246. return true
  247. default:
  248. return false
  249. }
  250. }
  251. func (c *rawConnection) ping() bool {
  252. var id int
  253. select {
  254. case id = <-c.nextID:
  255. case <-c.closed:
  256. return false
  257. }
  258. return c.send(id, messageTypePing, nil, nil)
  259. }
  260. func (c *rawConnection) readerLoop() (err error) {
  261. defer func() {
  262. c.close(err)
  263. }()
  264. state := stateInitial
  265. for {
  266. select {
  267. case <-c.closed:
  268. return ErrClosed
  269. default:
  270. }
  271. hdr, msg, err := c.readMessage()
  272. if err != nil {
  273. return err
  274. }
  275. switch msg := msg.(type) {
  276. case ClusterConfigMessage:
  277. if state != stateInitial {
  278. return fmt.Errorf("protocol error: cluster config message in state %d", state)
  279. }
  280. go c.receiver.ClusterConfig(c.id, msg)
  281. state = stateReady
  282. case IndexMessage:
  283. switch hdr.msgType {
  284. case messageTypeIndex:
  285. if state != stateReady {
  286. return fmt.Errorf("protocol error: index message in state %d", state)
  287. }
  288. c.handleIndex(msg)
  289. state = stateReady
  290. case messageTypeIndexUpdate:
  291. if state != stateReady {
  292. return fmt.Errorf("protocol error: index update message in state %d", state)
  293. }
  294. c.handleIndexUpdate(msg)
  295. state = stateReady
  296. }
  297. case RequestMessage:
  298. if state != stateReady {
  299. return fmt.Errorf("protocol error: request message in state %d", state)
  300. }
  301. // Requests are handled asynchronously
  302. go c.handleRequest(hdr.msgID, msg)
  303. case ResponseMessage:
  304. if state != stateReady {
  305. return fmt.Errorf("protocol error: response message in state %d", state)
  306. }
  307. c.handleResponse(hdr.msgID, msg)
  308. case pingMessage:
  309. if state != stateReady {
  310. return fmt.Errorf("protocol error: ping message in state %d", state)
  311. }
  312. // Nothing
  313. case CloseMessage:
  314. return errors.New(msg.Reason)
  315. default:
  316. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  317. }
  318. }
  319. }
  320. func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
  321. hdrBuf := make([]byte, 8)
  322. _, err = io.ReadFull(c.cr, hdrBuf)
  323. if err != nil {
  324. return
  325. }
  326. hdr = decodeHeader(binary.BigEndian.Uint32(hdrBuf[:4]))
  327. msglen := int(binary.BigEndian.Uint32(hdrBuf[4:]))
  328. l.Debugf("read header %v (msglen=%d)", hdr, msglen)
  329. if msglen > MaxMessageLen {
  330. err = fmt.Errorf("message length %d exceeds maximum %d", msglen, MaxMessageLen)
  331. return
  332. }
  333. if hdr.version != 0 {
  334. err = fmt.Errorf("unknown protocol version 0x%x", hdr.version)
  335. return
  336. }
  337. // c.readerBuf contains a buffer we can reuse. But once we've unmarshalled
  338. // a message from the buffer we can't reuse it again as the unmarshalled
  339. // message refers to the contents of the buffer. The only case we a buffer
  340. // ends up in readerBuf for reuse is when the message is compressed, as we
  341. // then decompress into a new buffer instead.
  342. var msgBuf []byte
  343. if cap(c.readerBuf) >= msglen {
  344. // If we have a buffer ready in rdbuf we just use that.
  345. msgBuf = c.readerBuf[:msglen]
  346. } else {
  347. // Otherwise we allocate a new buffer.
  348. msgBuf = make([]byte, msglen)
  349. }
  350. _, err = io.ReadFull(c.cr, msgBuf)
  351. if err != nil {
  352. return
  353. }
  354. l.Debugf("read %d bytes", len(msgBuf))
  355. if hdr.compression && msglen > 0 {
  356. // We're going to decompress msgBuf into a different newly allocated
  357. // buffer, so keep msgBuf around for reuse on the next message.
  358. c.readerBuf = msgBuf
  359. msgBuf, err = lz4.Decode(nil, msgBuf)
  360. if err != nil {
  361. return
  362. }
  363. l.Debugf("decompressed to %d bytes", len(msgBuf))
  364. } else {
  365. c.readerBuf = nil
  366. }
  367. if shouldDebug() {
  368. if len(msgBuf) > 1024 {
  369. l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
  370. } else {
  371. l.Debugf("message data:\n%s", hex.Dump(msgBuf))
  372. }
  373. }
  374. switch hdr.msgType {
  375. case messageTypeIndex, messageTypeIndexUpdate:
  376. var idx IndexMessage
  377. err = idx.UnmarshalXDR(msgBuf)
  378. msg = idx
  379. case messageTypeRequest:
  380. var req RequestMessage
  381. err = req.UnmarshalXDR(msgBuf)
  382. msg = req
  383. case messageTypeResponse:
  384. var resp ResponseMessage
  385. err = resp.UnmarshalXDR(msgBuf)
  386. msg = resp
  387. case messageTypePing:
  388. msg = pingMessage{}
  389. case messageTypeClusterConfig:
  390. var cc ClusterConfigMessage
  391. err = cc.UnmarshalXDR(msgBuf)
  392. msg = cc
  393. case messageTypeClose:
  394. var cm CloseMessage
  395. err = cm.UnmarshalXDR(msgBuf)
  396. msg = cm
  397. default:
  398. err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  399. }
  400. // We check the returned error for the XDRError.IsEOF() method.
  401. // IsEOF()==true here means that the message contained fewer fields than
  402. // expected. It does not signify an EOF on the socket, because we've
  403. // successfully read a size value and then that many bytes from the wire.
  404. // New fields we expected but the other peer didn't send should be
  405. // interpreted as zero/nil, and if that's not valid we'll verify it
  406. // somewhere else.
  407. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  408. err = nil
  409. }
  410. return
  411. }
  412. func (c *rawConnection) handleIndex(im IndexMessage) {
  413. l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
  414. c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
  415. }
  416. func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
  417. l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
  418. c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
  419. }
  420. func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
  421. var out []FileInfo
  422. for i, f := range fs {
  423. switch f.Name {
  424. case "", ".", "..", "/": // A few obviously invalid filenames
  425. l.Infof("Dropping invalid filename %q from incoming index", f.Name)
  426. if out == nil {
  427. // Most incoming updates won't contain anything invalid, so we
  428. // delay the allocation and copy to output slice until we
  429. // really need to do it, then copy all the so var valid files
  430. // to it.
  431. out = make([]FileInfo, i, len(fs)-1)
  432. copy(out, fs)
  433. }
  434. default:
  435. if out != nil {
  436. out = append(out, f)
  437. }
  438. }
  439. }
  440. if out != nil {
  441. return out
  442. }
  443. return fs
  444. }
  445. func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
  446. size := int(req.Size)
  447. usePool := size <= BlockSize
  448. var buf []byte
  449. var done chan struct{}
  450. if usePool {
  451. buf = c.pool.Get().([]byte)[:size]
  452. done = make(chan struct{})
  453. } else {
  454. buf = make([]byte, size)
  455. }
  456. err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
  457. if err != nil {
  458. c.send(msgID, messageTypeResponse, ResponseMessage{
  459. Data: nil,
  460. Code: errorToCode(err),
  461. }, done)
  462. } else {
  463. c.send(msgID, messageTypeResponse, ResponseMessage{
  464. Data: buf,
  465. Code: errorToCode(err),
  466. }, done)
  467. }
  468. if usePool {
  469. <-done
  470. c.pool.Put(buf)
  471. }
  472. }
  473. func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
  474. c.awaitingMut.Lock()
  475. if rc := c.awaiting[msgID]; rc != nil {
  476. c.awaiting[msgID] = nil
  477. rc <- asyncResult{resp.Data, codeToError(resp.Code)}
  478. close(rc)
  479. }
  480. c.awaitingMut.Unlock()
  481. }
  482. func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
  483. if msgID < 0 {
  484. select {
  485. case id := <-c.nextID:
  486. msgID = id
  487. case <-c.closed:
  488. return false
  489. }
  490. }
  491. hdr := header{
  492. version: 0,
  493. msgID: msgID,
  494. msgType: msgType,
  495. }
  496. select {
  497. case c.outbox <- hdrMsg{hdr, msg, done}:
  498. return true
  499. case <-c.closed:
  500. return false
  501. }
  502. }
  503. func (c *rawConnection) writerLoop() {
  504. var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
  505. var uncBuf []byte // buffer for uncompressed message, kept and reused
  506. for {
  507. var tempBuf []byte
  508. var err error
  509. select {
  510. case hm := <-c.outbox:
  511. if hm.msg != nil {
  512. // Uncompressed message in uncBuf
  513. msgLen := hm.msg.XDRSize()
  514. if cap(uncBuf) >= msgLen {
  515. uncBuf = uncBuf[:msgLen]
  516. } else {
  517. uncBuf = make([]byte, msgLen)
  518. }
  519. m := &xdr.Marshaller{Data: uncBuf}
  520. err = hm.msg.MarshalXDRInto(m)
  521. if hm.done != nil {
  522. close(hm.done)
  523. }
  524. if err != nil {
  525. c.close(err)
  526. return
  527. }
  528. compress := false
  529. switch c.compression {
  530. case CompressAlways:
  531. compress = true
  532. case CompressMetadata:
  533. compress = hm.hdr.msgType != messageTypeResponse
  534. }
  535. if compress && len(uncBuf) >= compressionThreshold {
  536. // Use compression for large messages
  537. hm.hdr.compression = true
  538. // Make sure we have enough space for the compressed message plus header in msgBug
  539. msgBuf = msgBuf[:cap(msgBuf)]
  540. if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
  541. msgBuf = make([]byte, maxLen)
  542. }
  543. // Compressed is written to msgBuf, we keep tb for the length only
  544. tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
  545. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
  546. msgBuf = msgBuf[0 : len(tempBuf)+8]
  547. l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
  548. } else {
  549. // No point in compressing very short messages
  550. hm.hdr.compression = false
  551. msgBuf = msgBuf[:cap(msgBuf)]
  552. if l := len(uncBuf) + 8; l > len(msgBuf) {
  553. msgBuf = make([]byte, l)
  554. }
  555. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
  556. msgBuf = msgBuf[0 : len(uncBuf)+8]
  557. copy(msgBuf[8:], uncBuf)
  558. l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
  559. }
  560. } else {
  561. l.Debugf("write empty message; %v", hm.hdr)
  562. binary.BigEndian.PutUint32(msgBuf[4:8], 0)
  563. msgBuf = msgBuf[:8]
  564. }
  565. binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
  566. if err == nil {
  567. var n int
  568. n, err = c.cw.Write(msgBuf)
  569. l.Debugf("wrote %d bytes on the wire", n)
  570. }
  571. if err != nil {
  572. c.close(err)
  573. return
  574. }
  575. case <-c.closed:
  576. return
  577. }
  578. }
  579. }
  580. func (c *rawConnection) close(err error) {
  581. c.once.Do(func() {
  582. l.Debugln("close due to", err)
  583. close(c.closed)
  584. c.awaitingMut.Lock()
  585. for i, ch := range c.awaiting {
  586. if ch != nil {
  587. close(ch)
  588. c.awaiting[i] = nil
  589. }
  590. }
  591. c.awaitingMut.Unlock()
  592. go c.receiver.Close(c.id, err)
  593. })
  594. }
  595. func (c *rawConnection) idGenerator() {
  596. nextID := 0
  597. for {
  598. nextID = (nextID + 1) & 0xfff
  599. select {
  600. case c.nextID <- nextID:
  601. case <-c.closed:
  602. return
  603. }
  604. }
  605. }
  606. // The pingSender makes sure that we've sent a message within the last
  607. // PingSendInterval. If we already have something sent in the last
  608. // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
  609. // results in an effecting ping interval of somewhere between
  610. // PingSendInterval/2 and PingSendInterval.
  611. func (c *rawConnection) pingSender() {
  612. ticker := time.Tick(PingSendInterval / 2)
  613. for {
  614. select {
  615. case <-ticker:
  616. d := time.Since(c.cw.Last())
  617. if d < PingSendInterval/2 {
  618. l.Debugln(c.id, "ping skipped after wr", d)
  619. continue
  620. }
  621. l.Debugln(c.id, "ping -> after", d)
  622. c.ping()
  623. case <-c.closed:
  624. return
  625. }
  626. }
  627. }
  628. // The pingReciever checks that we've received a message (any message will do,
  629. // but we expect pings in the absence of other messages) within the last
  630. // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
  631. func (c *rawConnection) pingReceiver() {
  632. ticker := time.Tick(ReceiveTimeout / 2)
  633. for {
  634. select {
  635. case <-ticker:
  636. d := time.Since(c.cr.Last())
  637. if d > ReceiveTimeout {
  638. l.Debugln(c.id, "ping timeout", d)
  639. c.close(ErrTimeout)
  640. }
  641. l.Debugln(c.id, "last read within", d)
  642. case <-c.closed:
  643. return
  644. }
  645. }
  646. }
  647. type Statistics struct {
  648. At time.Time
  649. InBytesTotal int64
  650. OutBytesTotal int64
  651. }
  652. func (c *rawConnection) Statistics() Statistics {
  653. return Statistics{
  654. At: time.Now(),
  655. InBytesTotal: c.cr.Tot(),
  656. OutBytesTotal: c.cw.Tot(),
  657. }
  658. }