protocol.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package protocol
  5. import (
  6. "bufio"
  7. "encoding/binary"
  8. "encoding/hex"
  9. "errors"
  10. "fmt"
  11. "io"
  12. "sync"
  13. "time"
  14. lz4 "github.com/bkaradzic/go-lz4"
  15. )
  16. const (
  17. BlockSize = 128 * 1024
  18. )
  19. const (
  20. messageTypeClusterConfig = 0
  21. messageTypeIndex = 1
  22. messageTypeRequest = 2
  23. messageTypeResponse = 3
  24. messageTypePing = 4
  25. messageTypePong = 5
  26. messageTypeIndexUpdate = 6
  27. messageTypeClose = 7
  28. )
  29. const (
  30. stateInitial = iota
  31. stateCCRcvd
  32. stateIdxRcvd
  33. )
  34. const (
  35. FlagDeleted uint32 = 1 << 12
  36. FlagInvalid = 1 << 13
  37. FlagDirectory = 1 << 14
  38. FlagNoPermBits = 1 << 15
  39. )
  40. const (
  41. FlagShareTrusted uint32 = 1 << 0
  42. FlagShareReadOnly = 1 << 1
  43. FlagShareBits = 0x000000ff
  44. )
  45. var (
  46. ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
  47. ErrClosed = errors.New("connection closed")
  48. )
  49. type Model interface {
  50. // An index was received from the peer node
  51. Index(nodeID NodeID, repo string, files []FileInfo)
  52. // An index update was received from the peer node
  53. IndexUpdate(nodeID NodeID, repo string, files []FileInfo)
  54. // A request was made by the peer node
  55. Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error)
  56. // A cluster configuration message was received
  57. ClusterConfig(nodeID NodeID, config ClusterConfigMessage)
  58. // The peer node closed the connection
  59. Close(nodeID NodeID, err error)
  60. }
  61. type Connection interface {
  62. ID() NodeID
  63. Name() string
  64. Index(repo string, files []FileInfo) error
  65. IndexUpdate(repo string, files []FileInfo) error
  66. Request(repo string, name string, offset int64, size int) ([]byte, error)
  67. ClusterConfig(config ClusterConfigMessage)
  68. Statistics() Statistics
  69. }
  70. type rawConnection struct {
  71. id NodeID
  72. name string
  73. receiver Model
  74. state int
  75. cr *countingReader
  76. cw *countingWriter
  77. wb *bufio.Writer
  78. awaiting [4096]chan asyncResult
  79. awaitingMut sync.Mutex
  80. idxMut sync.Mutex // ensures serialization of Index calls
  81. nextID chan int
  82. outbox chan hdrMsg
  83. closed chan struct{}
  84. once sync.Once
  85. compressionThreshold int // compress messages larger than this many bytes
  86. rdbuf0 []byte // used & reused by readMessage
  87. rdbuf1 []byte // used & reused by readMessage
  88. }
  89. type asyncResult struct {
  90. val []byte
  91. err error
  92. }
  93. type hdrMsg struct {
  94. hdr header
  95. msg encodable
  96. }
  97. type encodable interface {
  98. AppendXDR([]byte) []byte
  99. }
  100. const (
  101. pingTimeout = 30 * time.Second
  102. pingIdleTime = 60 * time.Second
  103. )
  104. func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string, compress bool) Connection {
  105. cr := &countingReader{Reader: reader}
  106. cw := &countingWriter{Writer: writer}
  107. compThres := 1<<32 - 1 // compression disabled
  108. if compress {
  109. compThres = 128 // compress messages that are 128 bytes long or larger
  110. }
  111. c := rawConnection{
  112. id: nodeID,
  113. name: name,
  114. receiver: nativeModel{receiver},
  115. state: stateInitial,
  116. cr: cr,
  117. cw: cw,
  118. outbox: make(chan hdrMsg),
  119. nextID: make(chan int),
  120. closed: make(chan struct{}),
  121. compressionThreshold: compThres,
  122. }
  123. go c.readerLoop()
  124. go c.writerLoop()
  125. go c.pingerLoop()
  126. go c.idGenerator()
  127. return wireFormatConnection{&c}
  128. }
  129. func (c *rawConnection) ID() NodeID {
  130. return c.id
  131. }
  132. func (c *rawConnection) Name() string {
  133. return c.name
  134. }
  135. // Index writes the list of file information to the connected peer node
  136. func (c *rawConnection) Index(repo string, idx []FileInfo) error {
  137. select {
  138. case <-c.closed:
  139. return ErrClosed
  140. default:
  141. }
  142. c.idxMut.Lock()
  143. c.send(-1, messageTypeIndex, IndexMessage{repo, idx})
  144. c.idxMut.Unlock()
  145. return nil
  146. }
  147. // IndexUpdate writes the list of file information to the connected peer node as an update
  148. func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error {
  149. select {
  150. case <-c.closed:
  151. return ErrClosed
  152. default:
  153. }
  154. c.idxMut.Lock()
  155. c.send(-1, messageTypeIndexUpdate, IndexMessage{repo, idx})
  156. c.idxMut.Unlock()
  157. return nil
  158. }
  159. // Request returns the bytes for the specified block after fetching them from the connected peer.
  160. func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
  161. var id int
  162. select {
  163. case id = <-c.nextID:
  164. case <-c.closed:
  165. return nil, ErrClosed
  166. }
  167. c.awaitingMut.Lock()
  168. if ch := c.awaiting[id]; ch != nil {
  169. panic("id taken")
  170. }
  171. rc := make(chan asyncResult, 1)
  172. c.awaiting[id] = rc
  173. c.awaitingMut.Unlock()
  174. ok := c.send(id, messageTypeRequest, RequestMessage{repo, name, uint64(offset), uint32(size)})
  175. if !ok {
  176. return nil, ErrClosed
  177. }
  178. res, ok := <-rc
  179. if !ok {
  180. return nil, ErrClosed
  181. }
  182. return res.val, res.err
  183. }
  184. // ClusterConfig send the cluster configuration message to the peer and returns any error
  185. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  186. c.send(-1, messageTypeClusterConfig, config)
  187. }
  188. func (c *rawConnection) ping() bool {
  189. var id int
  190. select {
  191. case id = <-c.nextID:
  192. case <-c.closed:
  193. return false
  194. }
  195. rc := make(chan asyncResult, 1)
  196. c.awaitingMut.Lock()
  197. c.awaiting[id] = rc
  198. c.awaitingMut.Unlock()
  199. ok := c.send(id, messageTypePing, nil)
  200. if !ok {
  201. return false
  202. }
  203. res, ok := <-rc
  204. return ok && res.err == nil
  205. }
  206. func (c *rawConnection) readerLoop() (err error) {
  207. defer func() {
  208. c.close(err)
  209. }()
  210. for {
  211. select {
  212. case <-c.closed:
  213. return ErrClosed
  214. default:
  215. }
  216. hdr, msg, err := c.readMessage()
  217. if err != nil {
  218. return err
  219. }
  220. switch hdr.msgType {
  221. case messageTypeIndex:
  222. if c.state < stateCCRcvd {
  223. return fmt.Errorf("protocol error: index message in state %d", c.state)
  224. }
  225. c.handleIndex(msg.(IndexMessage))
  226. c.state = stateIdxRcvd
  227. case messageTypeIndexUpdate:
  228. if c.state < stateIdxRcvd {
  229. return fmt.Errorf("protocol error: index update message in state %d", c.state)
  230. }
  231. c.handleIndexUpdate(msg.(IndexMessage))
  232. case messageTypeRequest:
  233. if c.state < stateIdxRcvd {
  234. return fmt.Errorf("protocol error: request message in state %d", c.state)
  235. }
  236. // Requests are handled asynchronously
  237. go c.handleRequest(hdr.msgID, msg.(RequestMessage))
  238. case messageTypeResponse:
  239. if c.state < stateIdxRcvd {
  240. return fmt.Errorf("protocol error: response message in state %d", c.state)
  241. }
  242. c.handleResponse(hdr.msgID, msg.(ResponseMessage))
  243. case messageTypePing:
  244. c.send(hdr.msgID, messageTypePong, EmptyMessage{})
  245. case messageTypePong:
  246. c.handlePong(hdr.msgID)
  247. case messageTypeClusterConfig:
  248. if c.state != stateInitial {
  249. return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
  250. }
  251. go c.receiver.ClusterConfig(c.id, msg.(ClusterConfigMessage))
  252. c.state = stateCCRcvd
  253. case messageTypeClose:
  254. return errors.New(msg.(CloseMessage).Reason)
  255. default:
  256. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  257. }
  258. }
  259. }
  260. func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
  261. if cap(c.rdbuf0) < 8 {
  262. c.rdbuf0 = make([]byte, 8)
  263. } else {
  264. c.rdbuf0 = c.rdbuf0[:8]
  265. }
  266. _, err = io.ReadFull(c.cr, c.rdbuf0)
  267. if err != nil {
  268. return
  269. }
  270. hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4]))
  271. msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8]))
  272. if debug {
  273. l.Debugf("read header %v (msglen=%d)", hdr, msglen)
  274. }
  275. if cap(c.rdbuf0) < msglen {
  276. c.rdbuf0 = make([]byte, msglen)
  277. } else {
  278. c.rdbuf0 = c.rdbuf0[:msglen]
  279. }
  280. _, err = io.ReadFull(c.cr, c.rdbuf0)
  281. if err != nil {
  282. return
  283. }
  284. if debug {
  285. l.Debugf("read %d bytes", len(c.rdbuf0))
  286. }
  287. msgBuf := c.rdbuf0
  288. if hdr.compression {
  289. c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)]
  290. c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0)
  291. if err != nil {
  292. return
  293. }
  294. msgBuf = c.rdbuf1
  295. if debug {
  296. l.Debugf("decompressed to %d bytes", len(msgBuf))
  297. }
  298. }
  299. if debug {
  300. if len(msgBuf) > 1024 {
  301. l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
  302. } else {
  303. l.Debugf("message data:\n%s", hex.Dump(msgBuf))
  304. }
  305. }
  306. switch hdr.msgType {
  307. case messageTypeIndex, messageTypeIndexUpdate:
  308. var idx IndexMessage
  309. err = idx.UnmarshalXDR(msgBuf)
  310. msg = idx
  311. case messageTypeRequest:
  312. var req RequestMessage
  313. err = req.UnmarshalXDR(msgBuf)
  314. msg = req
  315. case messageTypeResponse:
  316. var resp ResponseMessage
  317. err = resp.UnmarshalXDR(msgBuf)
  318. msg = resp
  319. case messageTypePing, messageTypePong:
  320. msg = EmptyMessage{}
  321. case messageTypeClusterConfig:
  322. var cc ClusterConfigMessage
  323. err = cc.UnmarshalXDR(msgBuf)
  324. msg = cc
  325. case messageTypeClose:
  326. var cm CloseMessage
  327. err = cm.UnmarshalXDR(msgBuf)
  328. msg = cm
  329. default:
  330. err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  331. }
  332. return
  333. }
  334. func (c *rawConnection) handleIndex(im IndexMessage) {
  335. if debug {
  336. l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  337. }
  338. c.receiver.Index(c.id, im.Repository, im.Files)
  339. }
  340. func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
  341. if debug {
  342. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  343. }
  344. c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
  345. }
  346. func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
  347. data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
  348. c.send(msgID, messageTypeResponse, ResponseMessage{data})
  349. }
  350. func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
  351. c.awaitingMut.Lock()
  352. if rc := c.awaiting[msgID]; rc != nil {
  353. c.awaiting[msgID] = nil
  354. rc <- asyncResult{resp.Data, nil}
  355. close(rc)
  356. }
  357. c.awaitingMut.Unlock()
  358. }
  359. func (c *rawConnection) handlePong(msgID int) {
  360. c.awaitingMut.Lock()
  361. if rc := c.awaiting[msgID]; rc != nil {
  362. c.awaiting[msgID] = nil
  363. rc <- asyncResult{}
  364. close(rc)
  365. }
  366. c.awaitingMut.Unlock()
  367. }
  368. func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
  369. if msgID < 0 {
  370. select {
  371. case id := <-c.nextID:
  372. msgID = id
  373. case <-c.closed:
  374. return false
  375. }
  376. }
  377. hdr := header{
  378. version: 0,
  379. msgID: msgID,
  380. msgType: msgType,
  381. }
  382. select {
  383. case c.outbox <- hdrMsg{hdr, msg}:
  384. return true
  385. case <-c.closed:
  386. return false
  387. }
  388. }
  389. func (c *rawConnection) writerLoop() {
  390. var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
  391. var uncBuf []byte // buffer for uncompressed message, kept and reused
  392. for {
  393. var tempBuf []byte
  394. var err error
  395. select {
  396. case hm := <-c.outbox:
  397. if hm.msg != nil {
  398. // Uncompressed message in uncBuf
  399. uncBuf = hm.msg.AppendXDR(uncBuf[:0])
  400. if len(uncBuf) >= c.compressionThreshold {
  401. // Use compression for large messages
  402. hm.hdr.compression = true
  403. // Make sure we have enough space for the compressed message plus header in msgBug
  404. msgBuf = msgBuf[:cap(msgBuf)]
  405. if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
  406. msgBuf = make([]byte, maxLen)
  407. }
  408. // Compressed is written to msgBuf, we keep tb for the length only
  409. tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
  410. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
  411. msgBuf = msgBuf[0 : len(tempBuf)+8]
  412. if debug {
  413. l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
  414. }
  415. } else {
  416. // No point in compressing very short messages
  417. hm.hdr.compression = false
  418. msgBuf = msgBuf[:cap(msgBuf)]
  419. if l := len(uncBuf) + 8; l > len(msgBuf) {
  420. msgBuf = make([]byte, l)
  421. }
  422. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
  423. msgBuf = msgBuf[0 : len(uncBuf)+8]
  424. copy(msgBuf[8:], uncBuf)
  425. if debug {
  426. l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
  427. }
  428. }
  429. } else {
  430. if debug {
  431. l.Debugf("write empty message; %v", hm.hdr)
  432. }
  433. binary.BigEndian.PutUint32(msgBuf[4:8], 0)
  434. msgBuf = msgBuf[:8]
  435. }
  436. binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
  437. if err == nil {
  438. var n int
  439. n, err = c.cw.Write(msgBuf)
  440. if debug {
  441. l.Debugf("wrote %d bytes on the wire", n)
  442. }
  443. }
  444. if err != nil {
  445. c.close(err)
  446. return
  447. }
  448. case <-c.closed:
  449. return
  450. }
  451. }
  452. }
  453. func (c *rawConnection) close(err error) {
  454. c.once.Do(func() {
  455. close(c.closed)
  456. c.awaitingMut.Lock()
  457. for i, ch := range c.awaiting {
  458. if ch != nil {
  459. close(ch)
  460. c.awaiting[i] = nil
  461. }
  462. }
  463. c.awaitingMut.Unlock()
  464. go c.receiver.Close(c.id, err)
  465. })
  466. }
  467. func (c *rawConnection) idGenerator() {
  468. nextID := 0
  469. for {
  470. nextID = (nextID + 1) & 0xfff
  471. select {
  472. case c.nextID <- nextID:
  473. case <-c.closed:
  474. return
  475. }
  476. }
  477. }
  478. func (c *rawConnection) pingerLoop() {
  479. var rc = make(chan bool, 1)
  480. ticker := time.Tick(pingIdleTime / 2)
  481. for {
  482. select {
  483. case <-ticker:
  484. if d := time.Since(c.cr.Last()); d < pingIdleTime {
  485. if debug {
  486. l.Debugln(c.id, "ping skipped after rd", d)
  487. }
  488. continue
  489. }
  490. if d := time.Since(c.cw.Last()); d < pingIdleTime {
  491. if debug {
  492. l.Debugln(c.id, "ping skipped after wr", d)
  493. }
  494. continue
  495. }
  496. go func() {
  497. if debug {
  498. l.Debugln(c.id, "ping ->")
  499. }
  500. rc <- c.ping()
  501. }()
  502. select {
  503. case ok := <-rc:
  504. if debug {
  505. l.Debugln(c.id, "<- pong")
  506. }
  507. if !ok {
  508. c.close(fmt.Errorf("ping failure"))
  509. }
  510. case <-time.After(pingTimeout):
  511. c.close(fmt.Errorf("ping timeout"))
  512. case <-c.closed:
  513. return
  514. }
  515. case <-c.closed:
  516. return
  517. }
  518. }
  519. }
  520. type Statistics struct {
  521. At time.Time
  522. InBytesTotal uint64
  523. OutBytesTotal uint64
  524. }
  525. func (c *rawConnection) Statistics() Statistics {
  526. return Statistics{
  527. At: time.Now(),
  528. InBytesTotal: c.cr.Tot(),
  529. OutBytesTotal: c.cw.Tot(),
  530. }
  531. }
  532. func IsDeleted(bits uint32) bool {
  533. return bits&FlagDeleted != 0
  534. }
  535. func IsInvalid(bits uint32) bool {
  536. return bits&FlagInvalid != 0
  537. }
  538. func IsDirectory(bits uint32) bool {
  539. return bits&FlagDirectory != 0
  540. }
  541. func HasPermissionBits(bits uint32) bool {
  542. return bits&FlagNoPermBits == 0
  543. }