protocol.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810
  1. // Copyright (C) 2014 The Protocol Authors.
  2. package protocol
  3. import (
  4. "encoding/binary"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. lz4 "github.com/bkaradzic/go-lz4"
  12. "github.com/calmh/xdr"
  13. )
  14. const (
  15. // BlockSize is the standard ata block size (128 KiB)
  16. BlockSize = 128 << 10
  17. // MaxMessageLen is the largest message size allowed on the wire. (512 MiB)
  18. MaxMessageLen = 64 << 23
  19. )
  20. const (
  21. messageTypeClusterConfig = 0
  22. messageTypeIndex = 1
  23. messageTypeRequest = 2
  24. messageTypeResponse = 3
  25. messageTypePing = 4
  26. messageTypeIndexUpdate = 6
  27. messageTypeClose = 7
  28. messageTypeDownloadProgress = 8
  29. )
  30. const (
  31. stateInitial = iota
  32. stateReady
  33. )
  34. // FileInfo flags
  35. const (
  36. FlagDeleted uint32 = 1 << 12 // bit 19 in MSB order with the first bit being #0
  37. FlagInvalid = 1 << 13 // bit 18
  38. FlagDirectory = 1 << 14 // bit 17
  39. FlagNoPermBits = 1 << 15 // bit 16
  40. FlagSymlink = 1 << 16 // bit 15
  41. FlagSymlinkMissingTarget = 1 << 17 // bit 14
  42. FlagsAll = (1 << 18) - 1
  43. SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
  44. )
  45. // Request message flags
  46. const (
  47. FlagFromTemporary uint32 = 1 << iota
  48. )
  49. // FileDownloadProgressUpdate update types
  50. const (
  51. UpdateTypeAppend uint32 = iota
  52. UpdateTypeForget
  53. )
  54. // ClusterConfigMessage.Folders flags
  55. const (
  56. FlagFolderReadOnly uint32 = 1 << 0
  57. FlagFolderIgnorePerms = 1 << 1
  58. FlagFolderIgnoreDelete = 1 << 2
  59. FlagFolderDisabledTempIndexes = 1 << 3
  60. FlagFolderAll = 1<<4 - 1
  61. )
  62. // ClusterConfigMessage.Folders.Devices flags
  63. const (
  64. FlagShareTrusted uint32 = 1 << 0
  65. FlagShareReadOnly = 1 << 1
  66. FlagIntroducer = 1 << 2
  67. FlagShareBits = 0x000000ff
  68. )
  69. var (
  70. ErrClosed = errors.New("connection closed")
  71. ErrTimeout = errors.New("read timeout")
  72. ErrSwitchingConnections = errors.New("switching connections")
  73. )
  74. // Specific variants of empty messages...
  75. type pingMessage struct{ EmptyMessage }
  76. type Model interface {
  77. // An index was received from the peer device
  78. Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
  79. // An index update was received from the peer device
  80. IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
  81. // A request was made by the peer device
  82. Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
  83. // A cluster configuration message was received
  84. ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
  85. // The peer device closed the connection
  86. Close(deviceID DeviceID, err error)
  87. // The peer device sent progress updates for the files it is currently downloading
  88. DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
  89. }
  90. type Connection interface {
  91. Start()
  92. ID() DeviceID
  93. Name() string
  94. Index(folder string, files []FileInfo, flags uint32, options []Option) error
  95. IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
  96. Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
  97. ClusterConfig(config ClusterConfigMessage)
  98. DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
  99. Statistics() Statistics
  100. Closed() bool
  101. }
  102. type rawConnection struct {
  103. id DeviceID
  104. name string
  105. receiver Model
  106. cr *countingReader
  107. cw *countingWriter
  108. awaiting [4096]chan asyncResult
  109. awaitingMut sync.Mutex
  110. idxMut sync.Mutex // ensures serialization of Index calls
  111. nextID chan int
  112. outbox chan hdrMsg
  113. closed chan struct{}
  114. once sync.Once
  115. pool sync.Pool
  116. compression Compression
  117. readerBuf []byte // used & reused by readMessage
  118. }
  119. type asyncResult struct {
  120. val []byte
  121. err error
  122. }
  123. type hdrMsg struct {
  124. hdr header
  125. msg encodable
  126. done chan struct{}
  127. }
  128. type encodable interface {
  129. MarshalXDRInto(m *xdr.Marshaller) error
  130. XDRSize() int
  131. }
  132. type isEofer interface {
  133. IsEOF() bool
  134. }
  135. const (
  136. // PingSendInterval is how often we make sure to send a message, by
  137. // triggering pings if necessary.
  138. PingSendInterval = 90 * time.Second
  139. // ReceiveTimeout is the longest we'll wait for a message from the other
  140. // side before closing the connection.
  141. ReceiveTimeout = 300 * time.Second
  142. )
  143. func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
  144. cr := &countingReader{Reader: reader}
  145. cw := &countingWriter{Writer: writer}
  146. c := rawConnection{
  147. id: deviceID,
  148. name: name,
  149. receiver: nativeModel{receiver},
  150. cr: cr,
  151. cw: cw,
  152. outbox: make(chan hdrMsg),
  153. nextID: make(chan int),
  154. closed: make(chan struct{}),
  155. pool: sync.Pool{
  156. New: func() interface{} {
  157. return make([]byte, BlockSize)
  158. },
  159. },
  160. compression: compress,
  161. }
  162. return wireFormatConnection{&c}
  163. }
  164. // Start creates the goroutines for sending and receiving of messages. It must
  165. // be called exactly once after creating a connection.
  166. func (c *rawConnection) Start() {
  167. go c.readerLoop()
  168. go c.writerLoop()
  169. go c.pingSender()
  170. go c.pingReceiver()
  171. go c.idGenerator()
  172. }
  173. func (c *rawConnection) ID() DeviceID {
  174. return c.id
  175. }
  176. func (c *rawConnection) Name() string {
  177. return c.name
  178. }
  179. // Index writes the list of file information to the connected peer device
  180. func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, options []Option) error {
  181. select {
  182. case <-c.closed:
  183. return ErrClosed
  184. default:
  185. }
  186. c.idxMut.Lock()
  187. c.send(-1, messageTypeIndex, IndexMessage{
  188. Folder: folder,
  189. Files: idx,
  190. Flags: flags,
  191. Options: options,
  192. }, nil)
  193. c.idxMut.Unlock()
  194. return nil
  195. }
  196. // IndexUpdate writes the list of file information to the connected peer device as an update
  197. func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, options []Option) error {
  198. select {
  199. case <-c.closed:
  200. return ErrClosed
  201. default:
  202. }
  203. c.idxMut.Lock()
  204. c.send(-1, messageTypeIndexUpdate, IndexMessage{
  205. Folder: folder,
  206. Files: idx,
  207. Flags: flags,
  208. Options: options,
  209. }, nil)
  210. c.idxMut.Unlock()
  211. return nil
  212. }
  213. // Request returns the bytes for the specified block after fetching them from the connected peer.
  214. func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
  215. var id int
  216. select {
  217. case id = <-c.nextID:
  218. case <-c.closed:
  219. return nil, ErrClosed
  220. }
  221. var flags uint32
  222. if fromTemporary {
  223. flags |= FlagFromTemporary
  224. }
  225. c.awaitingMut.Lock()
  226. if ch := c.awaiting[id]; ch != nil {
  227. panic("id taken")
  228. }
  229. rc := make(chan asyncResult, 1)
  230. c.awaiting[id] = rc
  231. c.awaitingMut.Unlock()
  232. ok := c.send(id, messageTypeRequest, RequestMessage{
  233. Folder: folder,
  234. Name: name,
  235. Offset: offset,
  236. Size: int32(size),
  237. Hash: hash,
  238. Flags: flags,
  239. Options: nil,
  240. }, nil)
  241. if !ok {
  242. return nil, ErrClosed
  243. }
  244. res, ok := <-rc
  245. if !ok {
  246. return nil, ErrClosed
  247. }
  248. return res.val, res.err
  249. }
  250. // ClusterConfig send the cluster configuration message to the peer and returns any error
  251. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  252. c.send(-1, messageTypeClusterConfig, config, nil)
  253. }
  254. func (c *rawConnection) Closed() bool {
  255. select {
  256. case <-c.closed:
  257. return true
  258. default:
  259. return false
  260. }
  261. }
  262. // DownloadProgress sends the progress updates for the files that are currently being downloaded.
  263. func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option) {
  264. c.send(-1, messageTypeDownloadProgress, DownloadProgressMessage{
  265. Folder: folder,
  266. Updates: updates,
  267. Flags: flags,
  268. Options: options,
  269. }, nil)
  270. }
  271. func (c *rawConnection) ping() bool {
  272. var id int
  273. select {
  274. case id = <-c.nextID:
  275. case <-c.closed:
  276. return false
  277. }
  278. return c.send(id, messageTypePing, nil, nil)
  279. }
  280. func (c *rawConnection) readerLoop() (err error) {
  281. defer func() {
  282. c.close(err)
  283. }()
  284. state := stateInitial
  285. for {
  286. select {
  287. case <-c.closed:
  288. return ErrClosed
  289. default:
  290. }
  291. hdr, msg, err := c.readMessage()
  292. if err != nil {
  293. return err
  294. }
  295. switch msg := msg.(type) {
  296. case ClusterConfigMessage:
  297. if state != stateInitial {
  298. return fmt.Errorf("protocol error: cluster config message in state %d", state)
  299. }
  300. go c.receiver.ClusterConfig(c.id, msg)
  301. state = stateReady
  302. case IndexMessage:
  303. switch hdr.msgType {
  304. case messageTypeIndex:
  305. if state != stateReady {
  306. return fmt.Errorf("protocol error: index message in state %d", state)
  307. }
  308. c.handleIndex(msg)
  309. state = stateReady
  310. case messageTypeIndexUpdate:
  311. if state != stateReady {
  312. return fmt.Errorf("protocol error: index update message in state %d", state)
  313. }
  314. c.handleIndexUpdate(msg)
  315. state = stateReady
  316. }
  317. case RequestMessage:
  318. if state != stateReady {
  319. return fmt.Errorf("protocol error: request message in state %d", state)
  320. }
  321. // Requests are handled asynchronously
  322. go c.handleRequest(hdr.msgID, msg)
  323. case ResponseMessage:
  324. if state != stateReady {
  325. return fmt.Errorf("protocol error: response message in state %d", state)
  326. }
  327. c.handleResponse(hdr.msgID, msg)
  328. case DownloadProgressMessage:
  329. if state != stateReady {
  330. return fmt.Errorf("protocol error: response message in state %d", state)
  331. }
  332. c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates, msg.Flags, msg.Options)
  333. case pingMessage:
  334. if state != stateReady {
  335. return fmt.Errorf("protocol error: ping message in state %d", state)
  336. }
  337. // Nothing
  338. case CloseMessage:
  339. return errors.New(msg.Reason)
  340. default:
  341. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  342. }
  343. }
  344. }
  345. func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
  346. hdrBuf := make([]byte, 8)
  347. _, err = io.ReadFull(c.cr, hdrBuf)
  348. if err != nil {
  349. return
  350. }
  351. hdr = decodeHeader(binary.BigEndian.Uint32(hdrBuf[:4]))
  352. msglen := int(binary.BigEndian.Uint32(hdrBuf[4:]))
  353. l.Debugf("read header %v (msglen=%d)", hdr, msglen)
  354. if msglen > MaxMessageLen {
  355. err = fmt.Errorf("message length %d exceeds maximum %d", msglen, MaxMessageLen)
  356. return
  357. }
  358. if hdr.version != 0 {
  359. err = fmt.Errorf("unknown protocol version 0x%x", hdr.version)
  360. return
  361. }
  362. // c.readerBuf contains a buffer we can reuse. But once we've unmarshalled
  363. // a message from the buffer we can't reuse it again as the unmarshalled
  364. // message refers to the contents of the buffer. The only case we a buffer
  365. // ends up in readerBuf for reuse is when the message is compressed, as we
  366. // then decompress into a new buffer instead.
  367. var msgBuf []byte
  368. if cap(c.readerBuf) >= msglen {
  369. // If we have a buffer ready in rdbuf we just use that.
  370. msgBuf = c.readerBuf[:msglen]
  371. } else {
  372. // Otherwise we allocate a new buffer.
  373. msgBuf = make([]byte, msglen)
  374. }
  375. _, err = io.ReadFull(c.cr, msgBuf)
  376. if err != nil {
  377. return
  378. }
  379. l.Debugf("read %d bytes", len(msgBuf))
  380. if hdr.compression && msglen > 0 {
  381. // We're going to decompress msgBuf into a different newly allocated
  382. // buffer, so keep msgBuf around for reuse on the next message.
  383. c.readerBuf = msgBuf
  384. msgBuf, err = lz4.Decode(nil, msgBuf)
  385. if err != nil {
  386. return
  387. }
  388. l.Debugf("decompressed to %d bytes", len(msgBuf))
  389. } else {
  390. c.readerBuf = nil
  391. }
  392. if shouldDebug() {
  393. if len(msgBuf) > 1024 {
  394. l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
  395. } else {
  396. l.Debugf("message data:\n%s", hex.Dump(msgBuf))
  397. }
  398. }
  399. switch hdr.msgType {
  400. case messageTypeIndex, messageTypeIndexUpdate:
  401. var idx IndexMessage
  402. err = idx.UnmarshalXDR(msgBuf)
  403. msg = idx
  404. case messageTypeRequest:
  405. var req RequestMessage
  406. err = req.UnmarshalXDR(msgBuf)
  407. msg = req
  408. case messageTypeResponse:
  409. var resp ResponseMessage
  410. err = resp.UnmarshalXDR(msgBuf)
  411. msg = resp
  412. case messageTypePing:
  413. msg = pingMessage{}
  414. case messageTypeClusterConfig:
  415. var cc ClusterConfigMessage
  416. err = cc.UnmarshalXDR(msgBuf)
  417. msg = cc
  418. case messageTypeClose:
  419. var cm CloseMessage
  420. err = cm.UnmarshalXDR(msgBuf)
  421. msg = cm
  422. case messageTypeDownloadProgress:
  423. var dp DownloadProgressMessage
  424. err := dp.UnmarshalXDR(msgBuf)
  425. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  426. err = nil
  427. }
  428. msg = dp
  429. default:
  430. err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  431. }
  432. // We check the returned error for the XDRError.IsEOF() method.
  433. // IsEOF()==true here means that the message contained fewer fields than
  434. // expected. It does not signify an EOF on the socket, because we've
  435. // successfully read a size value and then that many bytes from the wire.
  436. // New fields we expected but the other peer didn't send should be
  437. // interpreted as zero/nil, and if that's not valid we'll verify it
  438. // somewhere else.
  439. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  440. err = nil
  441. }
  442. return
  443. }
  444. func (c *rawConnection) handleIndex(im IndexMessage) {
  445. l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
  446. c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
  447. }
  448. func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
  449. l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
  450. c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
  451. }
  452. func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
  453. var out []FileInfo
  454. for i, f := range fs {
  455. switch f.Name {
  456. case "", ".", "..", "/": // A few obviously invalid filenames
  457. l.Infof("Dropping invalid filename %q from incoming index", f.Name)
  458. if out == nil {
  459. // Most incoming updates won't contain anything invalid, so we
  460. // delay the allocation and copy to output slice until we
  461. // really need to do it, then copy all the so var valid files
  462. // to it.
  463. out = make([]FileInfo, i, len(fs)-1)
  464. copy(out, fs)
  465. }
  466. default:
  467. if out != nil {
  468. out = append(out, f)
  469. }
  470. }
  471. }
  472. if out != nil {
  473. return out
  474. }
  475. return fs
  476. }
  477. func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
  478. size := int(req.Size)
  479. usePool := size <= BlockSize
  480. var buf []byte
  481. var done chan struct{}
  482. if usePool {
  483. buf = c.pool.Get().([]byte)[:size]
  484. done = make(chan struct{})
  485. } else {
  486. buf = make([]byte, size)
  487. }
  488. err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
  489. if err != nil {
  490. c.send(msgID, messageTypeResponse, ResponseMessage{
  491. Data: nil,
  492. Code: errorToCode(err),
  493. }, done)
  494. } else {
  495. c.send(msgID, messageTypeResponse, ResponseMessage{
  496. Data: buf,
  497. Code: errorToCode(err),
  498. }, done)
  499. }
  500. if usePool {
  501. <-done
  502. c.pool.Put(buf)
  503. }
  504. }
  505. func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
  506. c.awaitingMut.Lock()
  507. if rc := c.awaiting[msgID]; rc != nil {
  508. c.awaiting[msgID] = nil
  509. rc <- asyncResult{resp.Data, codeToError(resp.Code)}
  510. close(rc)
  511. }
  512. c.awaitingMut.Unlock()
  513. }
  514. func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
  515. if msgID < 0 {
  516. select {
  517. case id := <-c.nextID:
  518. msgID = id
  519. case <-c.closed:
  520. return false
  521. }
  522. }
  523. hdr := header{
  524. version: 0,
  525. msgID: msgID,
  526. msgType: msgType,
  527. }
  528. select {
  529. case c.outbox <- hdrMsg{hdr, msg, done}:
  530. return true
  531. case <-c.closed:
  532. return false
  533. }
  534. }
  535. func (c *rawConnection) writerLoop() {
  536. var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
  537. var uncBuf []byte // buffer for uncompressed message, kept and reused
  538. for {
  539. var tempBuf []byte
  540. var err error
  541. select {
  542. case hm := <-c.outbox:
  543. if hm.msg != nil {
  544. // Uncompressed message in uncBuf
  545. msgLen := hm.msg.XDRSize()
  546. if cap(uncBuf) >= msgLen {
  547. uncBuf = uncBuf[:msgLen]
  548. } else {
  549. uncBuf = make([]byte, msgLen)
  550. }
  551. m := &xdr.Marshaller{Data: uncBuf}
  552. err = hm.msg.MarshalXDRInto(m)
  553. if hm.done != nil {
  554. close(hm.done)
  555. }
  556. if err != nil {
  557. c.close(err)
  558. return
  559. }
  560. compress := false
  561. switch c.compression {
  562. case CompressAlways:
  563. compress = true
  564. case CompressMetadata:
  565. compress = hm.hdr.msgType != messageTypeResponse
  566. }
  567. if compress && len(uncBuf) >= compressionThreshold {
  568. // Use compression for large messages
  569. hm.hdr.compression = true
  570. // Make sure we have enough space for the compressed message plus header in msgBug
  571. msgBuf = msgBuf[:cap(msgBuf)]
  572. if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
  573. msgBuf = make([]byte, maxLen)
  574. }
  575. // Compressed is written to msgBuf, we keep tb for the length only
  576. tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
  577. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
  578. msgBuf = msgBuf[0 : len(tempBuf)+8]
  579. l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
  580. } else {
  581. // No point in compressing very short messages
  582. hm.hdr.compression = false
  583. msgBuf = msgBuf[:cap(msgBuf)]
  584. if l := len(uncBuf) + 8; l > len(msgBuf) {
  585. msgBuf = make([]byte, l)
  586. }
  587. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
  588. msgBuf = msgBuf[0 : len(uncBuf)+8]
  589. copy(msgBuf[8:], uncBuf)
  590. l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
  591. }
  592. } else {
  593. l.Debugf("write empty message; %v", hm.hdr)
  594. binary.BigEndian.PutUint32(msgBuf[4:8], 0)
  595. msgBuf = msgBuf[:8]
  596. }
  597. binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
  598. if err == nil {
  599. var n int
  600. n, err = c.cw.Write(msgBuf)
  601. l.Debugf("wrote %d bytes on the wire", n)
  602. }
  603. if err != nil {
  604. c.close(err)
  605. return
  606. }
  607. case <-c.closed:
  608. return
  609. }
  610. }
  611. }
  612. func (c *rawConnection) close(err error) {
  613. c.once.Do(func() {
  614. l.Debugln("close due to", err)
  615. close(c.closed)
  616. c.awaitingMut.Lock()
  617. for i, ch := range c.awaiting {
  618. if ch != nil {
  619. close(ch)
  620. c.awaiting[i] = nil
  621. }
  622. }
  623. c.awaitingMut.Unlock()
  624. go c.receiver.Close(c.id, err)
  625. })
  626. }
  627. func (c *rawConnection) idGenerator() {
  628. nextID := 0
  629. for {
  630. nextID = (nextID + 1) & 0xfff
  631. select {
  632. case c.nextID <- nextID:
  633. case <-c.closed:
  634. return
  635. }
  636. }
  637. }
  638. // The pingSender makes sure that we've sent a message within the last
  639. // PingSendInterval. If we already have something sent in the last
  640. // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
  641. // results in an effecting ping interval of somewhere between
  642. // PingSendInterval/2 and PingSendInterval.
  643. func (c *rawConnection) pingSender() {
  644. ticker := time.Tick(PingSendInterval / 2)
  645. for {
  646. select {
  647. case <-ticker:
  648. d := time.Since(c.cw.Last())
  649. if d < PingSendInterval/2 {
  650. l.Debugln(c.id, "ping skipped after wr", d)
  651. continue
  652. }
  653. l.Debugln(c.id, "ping -> after", d)
  654. c.ping()
  655. case <-c.closed:
  656. return
  657. }
  658. }
  659. }
  660. // The pingReceiver checks that we've received a message (any message will do,
  661. // but we expect pings in the absence of other messages) within the last
  662. // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
  663. func (c *rawConnection) pingReceiver() {
  664. ticker := time.Tick(ReceiveTimeout / 2)
  665. for {
  666. select {
  667. case <-ticker:
  668. d := time.Since(c.cr.Last())
  669. if d > ReceiveTimeout {
  670. l.Debugln(c.id, "ping timeout", d)
  671. c.close(ErrTimeout)
  672. }
  673. l.Debugln(c.id, "last read within", d)
  674. case <-c.closed:
  675. return
  676. }
  677. }
  678. }
  679. type Statistics struct {
  680. At time.Time
  681. InBytesTotal int64
  682. OutBytesTotal int64
  683. }
  684. func (c *rawConnection) Statistics() Statistics {
  685. return Statistics{
  686. At: time.Now(),
  687. InBytesTotal: c.cr.Tot(),
  688. OutBytesTotal: c.cw.Tot(),
  689. }
  690. }