protocol.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719
  1. // Copyright (C) 2014 The Protocol Authors.
  2. package protocol
  3. import (
  4. "encoding/binary"
  5. "encoding/hex"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "sync"
  10. "time"
  11. lz4 "github.com/bkaradzic/go-lz4"
  12. )
  13. const (
  14. BlockSize = 128 * 1024
  15. )
  16. const (
  17. messageTypeClusterConfig = 0
  18. messageTypeIndex = 1
  19. messageTypeRequest = 2
  20. messageTypeResponse = 3
  21. messageTypePing = 4
  22. messageTypePong = 5
  23. messageTypeIndexUpdate = 6
  24. messageTypeClose = 7
  25. )
  26. const (
  27. stateInitial = iota
  28. stateCCRcvd
  29. stateIdxRcvd
  30. )
  31. const (
  32. FlagDeleted uint32 = 1 << 12
  33. FlagInvalid = 1 << 13
  34. FlagDirectory = 1 << 14
  35. FlagNoPermBits = 1 << 15
  36. FlagSymlink = 1 << 16
  37. FlagSymlinkMissingTarget = 1 << 17
  38. SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
  39. )
  40. const (
  41. FlagShareTrusted uint32 = 1 << 0
  42. FlagShareReadOnly = 1 << 1
  43. FlagIntroducer = 1 << 2
  44. FlagShareBits = 0x000000ff
  45. )
  46. var (
  47. ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
  48. ErrClosed = errors.New("connection closed")
  49. )
  50. // Specific variants of empty messages...
  51. type pingMessage struct{ EmptyMessage }
  52. type pongMessage struct{ EmptyMessage }
  53. type Model interface {
  54. // An index was received from the peer device
  55. Index(deviceID DeviceID, folder string, files []FileInfo)
  56. // An index update was received from the peer device
  57. IndexUpdate(deviceID DeviceID, folder string, files []FileInfo)
  58. // A request was made by the peer device
  59. Request(deviceID DeviceID, folder string, name string, offset int64, size int) ([]byte, error)
  60. // A cluster configuration message was received
  61. ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
  62. // The peer device closed the connection
  63. Close(deviceID DeviceID, err error)
  64. }
  65. type Connection interface {
  66. ID() DeviceID
  67. Name() string
  68. Index(folder string, files []FileInfo) error
  69. IndexUpdate(folder string, files []FileInfo) error
  70. Request(folder string, name string, offset int64, size int) ([]byte, error)
  71. ClusterConfig(config ClusterConfigMessage)
  72. Statistics() Statistics
  73. }
  74. type rawConnection struct {
  75. id DeviceID
  76. name string
  77. receiver Model
  78. state int
  79. cr *countingReader
  80. cw *countingWriter
  81. awaiting [4096]chan asyncResult
  82. awaitingMut sync.Mutex
  83. idxMut sync.Mutex // ensures serialization of Index calls
  84. nextID chan int
  85. outbox chan hdrMsg
  86. closed chan struct{}
  87. once sync.Once
  88. compressionThreshold int // compress messages larger than this many bytes
  89. rdbuf0 []byte // used & reused by readMessage
  90. rdbuf1 []byte // used & reused by readMessage
  91. }
  92. type asyncResult struct {
  93. val []byte
  94. err error
  95. }
  96. type hdrMsg struct {
  97. hdr header
  98. msg encodable
  99. }
  100. type encodable interface {
  101. AppendXDR([]byte) ([]byte, error)
  102. }
  103. type isEofer interface {
  104. IsEOF() bool
  105. }
  106. const (
  107. pingTimeout = 30 * time.Second
  108. pingIdleTime = 60 * time.Second
  109. )
  110. func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress bool) Connection {
  111. cr := &countingReader{Reader: reader}
  112. cw := &countingWriter{Writer: writer}
  113. compThres := 1<<31 - 1 // compression disabled
  114. if compress {
  115. compThres = 128 // compress messages that are 128 bytes long or larger
  116. }
  117. c := rawConnection{
  118. id: deviceID,
  119. name: name,
  120. receiver: nativeModel{receiver},
  121. state: stateInitial,
  122. cr: cr,
  123. cw: cw,
  124. outbox: make(chan hdrMsg),
  125. nextID: make(chan int),
  126. closed: make(chan struct{}),
  127. compressionThreshold: compThres,
  128. }
  129. go c.readerLoop()
  130. go c.writerLoop()
  131. go c.pingerLoop()
  132. go c.idGenerator()
  133. return wireFormatConnection{&c}
  134. }
  135. func (c *rawConnection) ID() DeviceID {
  136. return c.id
  137. }
  138. func (c *rawConnection) Name() string {
  139. return c.name
  140. }
  141. // Index writes the list of file information to the connected peer device
  142. func (c *rawConnection) Index(folder string, idx []FileInfo) error {
  143. select {
  144. case <-c.closed:
  145. return ErrClosed
  146. default:
  147. }
  148. c.idxMut.Lock()
  149. c.send(-1, messageTypeIndex, IndexMessage{
  150. Folder: folder,
  151. Files: idx,
  152. })
  153. c.idxMut.Unlock()
  154. return nil
  155. }
  156. // IndexUpdate writes the list of file information to the connected peer device as an update
  157. func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
  158. select {
  159. case <-c.closed:
  160. return ErrClosed
  161. default:
  162. }
  163. c.idxMut.Lock()
  164. c.send(-1, messageTypeIndexUpdate, IndexMessage{
  165. Folder: folder,
  166. Files: idx,
  167. })
  168. c.idxMut.Unlock()
  169. return nil
  170. }
  171. // Request returns the bytes for the specified block after fetching them from the connected peer.
  172. func (c *rawConnection) Request(folder string, name string, offset int64, size int) ([]byte, error) {
  173. var id int
  174. select {
  175. case id = <-c.nextID:
  176. case <-c.closed:
  177. return nil, ErrClosed
  178. }
  179. c.awaitingMut.Lock()
  180. if ch := c.awaiting[id]; ch != nil {
  181. panic("id taken")
  182. }
  183. rc := make(chan asyncResult, 1)
  184. c.awaiting[id] = rc
  185. c.awaitingMut.Unlock()
  186. ok := c.send(id, messageTypeRequest, RequestMessage{
  187. Folder: folder,
  188. Name: name,
  189. Offset: uint64(offset),
  190. Size: uint32(size),
  191. })
  192. if !ok {
  193. return nil, ErrClosed
  194. }
  195. res, ok := <-rc
  196. if !ok {
  197. return nil, ErrClosed
  198. }
  199. return res.val, res.err
  200. }
  201. // ClusterConfig send the cluster configuration message to the peer and returns any error
  202. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  203. c.send(-1, messageTypeClusterConfig, config)
  204. }
  205. func (c *rawConnection) ping() bool {
  206. var id int
  207. select {
  208. case id = <-c.nextID:
  209. case <-c.closed:
  210. return false
  211. }
  212. rc := make(chan asyncResult, 1)
  213. c.awaitingMut.Lock()
  214. c.awaiting[id] = rc
  215. c.awaitingMut.Unlock()
  216. ok := c.send(id, messageTypePing, nil)
  217. if !ok {
  218. return false
  219. }
  220. res, ok := <-rc
  221. return ok && res.err == nil
  222. }
  223. func (c *rawConnection) readerLoop() (err error) {
  224. defer func() {
  225. c.close(err)
  226. }()
  227. for {
  228. select {
  229. case <-c.closed:
  230. return ErrClosed
  231. default:
  232. }
  233. hdr, msg, err := c.readMessage()
  234. if err != nil {
  235. return err
  236. }
  237. switch msg := msg.(type) {
  238. case IndexMessage:
  239. if msg.Flags != 0 {
  240. // We don't currently support or expect any flags.
  241. return fmt.Errorf("protocol error: unknown flags 0x%x in Index(Update) message", msg.Flags)
  242. }
  243. switch hdr.msgType {
  244. case messageTypeIndex:
  245. if c.state < stateCCRcvd {
  246. return fmt.Errorf("protocol error: index message in state %d", c.state)
  247. }
  248. c.handleIndex(msg)
  249. c.state = stateIdxRcvd
  250. case messageTypeIndexUpdate:
  251. if c.state < stateIdxRcvd {
  252. return fmt.Errorf("protocol error: index update message in state %d", c.state)
  253. }
  254. c.handleIndexUpdate(msg)
  255. }
  256. case RequestMessage:
  257. if msg.Flags != 0 {
  258. // We don't currently support or expect any flags.
  259. return fmt.Errorf("protocol error: unknown flags 0x%x in Request message", msg.Flags)
  260. }
  261. if c.state < stateIdxRcvd {
  262. return fmt.Errorf("protocol error: request message in state %d", c.state)
  263. }
  264. // Requests are handled asynchronously
  265. go c.handleRequest(hdr.msgID, msg)
  266. case ResponseMessage:
  267. if c.state < stateIdxRcvd {
  268. return fmt.Errorf("protocol error: response message in state %d", c.state)
  269. }
  270. c.handleResponse(hdr.msgID, msg)
  271. case pingMessage:
  272. c.send(hdr.msgID, messageTypePong, pongMessage{})
  273. case pongMessage:
  274. c.handlePong(hdr.msgID)
  275. case ClusterConfigMessage:
  276. if c.state != stateInitial {
  277. return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
  278. }
  279. go c.receiver.ClusterConfig(c.id, msg)
  280. c.state = stateCCRcvd
  281. case CloseMessage:
  282. return errors.New(msg.Reason)
  283. default:
  284. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  285. }
  286. }
  287. }
  288. func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
  289. if cap(c.rdbuf0) < 8 {
  290. c.rdbuf0 = make([]byte, 8)
  291. } else {
  292. c.rdbuf0 = c.rdbuf0[:8]
  293. }
  294. _, err = io.ReadFull(c.cr, c.rdbuf0)
  295. if err != nil {
  296. return
  297. }
  298. hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4]))
  299. msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8]))
  300. if debug {
  301. l.Debugf("read header %v (msglen=%d)", hdr, msglen)
  302. }
  303. if hdr.version != 0 {
  304. err = fmt.Errorf("unknown protocol version 0x%x", hdr.version)
  305. return
  306. }
  307. if cap(c.rdbuf0) < msglen {
  308. c.rdbuf0 = make([]byte, msglen)
  309. } else {
  310. c.rdbuf0 = c.rdbuf0[:msglen]
  311. }
  312. _, err = io.ReadFull(c.cr, c.rdbuf0)
  313. if err != nil {
  314. return
  315. }
  316. if debug {
  317. l.Debugf("read %d bytes", len(c.rdbuf0))
  318. }
  319. msgBuf := c.rdbuf0
  320. if hdr.compression {
  321. c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)]
  322. c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0)
  323. if err != nil {
  324. return
  325. }
  326. msgBuf = c.rdbuf1
  327. if debug {
  328. l.Debugf("decompressed to %d bytes", len(msgBuf))
  329. }
  330. }
  331. if debug {
  332. if len(msgBuf) > 1024 {
  333. l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
  334. } else {
  335. l.Debugf("message data:\n%s", hex.Dump(msgBuf))
  336. }
  337. }
  338. // We check each returned error for the XDRError.IsEOF() method.
  339. // IsEOF()==true here means that the message contained fewer fields than
  340. // expected. It does not signify an EOF on the socket, because we've
  341. // successfully read a size value and that many bytes already. New fields
  342. // we expected but the other peer didn't send should be interpreted as
  343. // zero/nil, and if that's not valid we'll verify it somewhere else.
  344. switch hdr.msgType {
  345. case messageTypeIndex, messageTypeIndexUpdate:
  346. var idx IndexMessage
  347. err = idx.UnmarshalXDR(msgBuf)
  348. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  349. err = nil
  350. }
  351. msg = idx
  352. case messageTypeRequest:
  353. var req RequestMessage
  354. err = req.UnmarshalXDR(msgBuf)
  355. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  356. err = nil
  357. }
  358. msg = req
  359. case messageTypeResponse:
  360. var resp ResponseMessage
  361. err = resp.UnmarshalXDR(msgBuf)
  362. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  363. err = nil
  364. }
  365. msg = resp
  366. case messageTypePing:
  367. msg = pingMessage{}
  368. case messageTypePong:
  369. msg = pongMessage{}
  370. case messageTypeClusterConfig:
  371. var cc ClusterConfigMessage
  372. err = cc.UnmarshalXDR(msgBuf)
  373. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  374. err = nil
  375. }
  376. msg = cc
  377. case messageTypeClose:
  378. var cm CloseMessage
  379. err = cm.UnmarshalXDR(msgBuf)
  380. if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
  381. err = nil
  382. }
  383. msg = cm
  384. default:
  385. err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  386. }
  387. return
  388. }
  389. func (c *rawConnection) handleIndex(im IndexMessage) {
  390. if debug {
  391. l.Debugf("Index(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
  392. }
  393. c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files))
  394. }
  395. func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
  396. if debug {
  397. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
  398. }
  399. c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files))
  400. }
  401. func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
  402. var out []FileInfo
  403. for i, f := range fs {
  404. switch f.Name {
  405. case "", ".", "..", "/": // A few obviously invalid filenames
  406. l.Infof("Dropping invalid filename %q from incoming index", f.Name)
  407. if out == nil {
  408. // Most incoming updates won't contain anything invalid, so we
  409. // delay the allocation and copy to output slice until we
  410. // really need to do it, then copy all the so var valid files
  411. // to it.
  412. out = make([]FileInfo, i, len(fs)-1)
  413. copy(out, fs)
  414. }
  415. default:
  416. if out != nil {
  417. out = append(out, f)
  418. }
  419. }
  420. }
  421. if out != nil {
  422. return out
  423. }
  424. return fs
  425. }
  426. func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
  427. data, _ := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size))
  428. c.send(msgID, messageTypeResponse, ResponseMessage{
  429. Data: data,
  430. })
  431. }
  432. func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
  433. c.awaitingMut.Lock()
  434. if rc := c.awaiting[msgID]; rc != nil {
  435. c.awaiting[msgID] = nil
  436. rc <- asyncResult{resp.Data, nil}
  437. close(rc)
  438. }
  439. c.awaitingMut.Unlock()
  440. }
  441. func (c *rawConnection) handlePong(msgID int) {
  442. c.awaitingMut.Lock()
  443. if rc := c.awaiting[msgID]; rc != nil {
  444. c.awaiting[msgID] = nil
  445. rc <- asyncResult{}
  446. close(rc)
  447. }
  448. c.awaitingMut.Unlock()
  449. }
  450. func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
  451. if msgID < 0 {
  452. select {
  453. case id := <-c.nextID:
  454. msgID = id
  455. case <-c.closed:
  456. return false
  457. }
  458. }
  459. hdr := header{
  460. version: 0,
  461. msgID: msgID,
  462. msgType: msgType,
  463. }
  464. select {
  465. case c.outbox <- hdrMsg{hdr, msg}:
  466. return true
  467. case <-c.closed:
  468. return false
  469. }
  470. }
  471. func (c *rawConnection) writerLoop() {
  472. var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
  473. var uncBuf []byte // buffer for uncompressed message, kept and reused
  474. for {
  475. var tempBuf []byte
  476. var err error
  477. select {
  478. case hm := <-c.outbox:
  479. if hm.msg != nil {
  480. // Uncompressed message in uncBuf
  481. uncBuf, err = hm.msg.AppendXDR(uncBuf[:0])
  482. if err != nil {
  483. c.close(err)
  484. return
  485. }
  486. if len(uncBuf) >= c.compressionThreshold {
  487. // Use compression for large messages
  488. hm.hdr.compression = true
  489. // Make sure we have enough space for the compressed message plus header in msgBug
  490. msgBuf = msgBuf[:cap(msgBuf)]
  491. if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
  492. msgBuf = make([]byte, maxLen)
  493. }
  494. // Compressed is written to msgBuf, we keep tb for the length only
  495. tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
  496. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
  497. msgBuf = msgBuf[0 : len(tempBuf)+8]
  498. if debug {
  499. l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
  500. }
  501. } else {
  502. // No point in compressing very short messages
  503. hm.hdr.compression = false
  504. msgBuf = msgBuf[:cap(msgBuf)]
  505. if l := len(uncBuf) + 8; l > len(msgBuf) {
  506. msgBuf = make([]byte, l)
  507. }
  508. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
  509. msgBuf = msgBuf[0 : len(uncBuf)+8]
  510. copy(msgBuf[8:], uncBuf)
  511. if debug {
  512. l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
  513. }
  514. }
  515. } else {
  516. if debug {
  517. l.Debugf("write empty message; %v", hm.hdr)
  518. }
  519. binary.BigEndian.PutUint32(msgBuf[4:8], 0)
  520. msgBuf = msgBuf[:8]
  521. }
  522. binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
  523. if err == nil {
  524. var n int
  525. n, err = c.cw.Write(msgBuf)
  526. if debug {
  527. l.Debugf("wrote %d bytes on the wire", n)
  528. }
  529. }
  530. if err != nil {
  531. c.close(err)
  532. return
  533. }
  534. case <-c.closed:
  535. return
  536. }
  537. }
  538. }
  539. func (c *rawConnection) close(err error) {
  540. c.once.Do(func() {
  541. close(c.closed)
  542. c.awaitingMut.Lock()
  543. for i, ch := range c.awaiting {
  544. if ch != nil {
  545. close(ch)
  546. c.awaiting[i] = nil
  547. }
  548. }
  549. c.awaitingMut.Unlock()
  550. go c.receiver.Close(c.id, err)
  551. })
  552. }
  553. func (c *rawConnection) idGenerator() {
  554. nextID := 0
  555. for {
  556. nextID = (nextID + 1) & 0xfff
  557. select {
  558. case c.nextID <- nextID:
  559. case <-c.closed:
  560. return
  561. }
  562. }
  563. }
  564. func (c *rawConnection) pingerLoop() {
  565. var rc = make(chan bool, 1)
  566. ticker := time.Tick(pingIdleTime / 2)
  567. for {
  568. select {
  569. case <-ticker:
  570. if d := time.Since(c.cr.Last()); d < pingIdleTime {
  571. if debug {
  572. l.Debugln(c.id, "ping skipped after rd", d)
  573. }
  574. continue
  575. }
  576. if d := time.Since(c.cw.Last()); d < pingIdleTime {
  577. if debug {
  578. l.Debugln(c.id, "ping skipped after wr", d)
  579. }
  580. continue
  581. }
  582. go func() {
  583. if debug {
  584. l.Debugln(c.id, "ping ->")
  585. }
  586. rc <- c.ping()
  587. }()
  588. select {
  589. case ok := <-rc:
  590. if debug {
  591. l.Debugln(c.id, "<- pong")
  592. }
  593. if !ok {
  594. c.close(fmt.Errorf("ping failure"))
  595. }
  596. case <-time.After(pingTimeout):
  597. c.close(fmt.Errorf("ping timeout"))
  598. case <-c.closed:
  599. return
  600. }
  601. case <-c.closed:
  602. return
  603. }
  604. }
  605. }
  606. type Statistics struct {
  607. At time.Time
  608. InBytesTotal uint64
  609. OutBytesTotal uint64
  610. }
  611. func (c *rawConnection) Statistics() Statistics {
  612. return Statistics{
  613. At: time.Now(),
  614. InBytesTotal: c.cr.Tot(),
  615. OutBytesTotal: c.cw.Tot(),
  616. }
  617. }