protocol.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package protocol
  5. import (
  6. "encoding/binary"
  7. "encoding/hex"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "sync"
  12. "time"
  13. lz4 "github.com/bkaradzic/go-lz4"
  14. )
  15. const (
  16. BlockSize = 128 * 1024
  17. )
  18. const (
  19. messageTypeClusterConfig = 0
  20. messageTypeIndex = 1
  21. messageTypeRequest = 2
  22. messageTypeResponse = 3
  23. messageTypePing = 4
  24. messageTypePong = 5
  25. messageTypeIndexUpdate = 6
  26. messageTypeClose = 7
  27. )
  28. const (
  29. stateInitial = iota
  30. stateCCRcvd
  31. stateIdxRcvd
  32. )
  33. const (
  34. FlagDeleted uint32 = 1 << 12
  35. FlagInvalid = 1 << 13
  36. FlagDirectory = 1 << 14
  37. FlagNoPermBits = 1 << 15
  38. )
  39. const (
  40. FlagShareTrusted uint32 = 1 << 0
  41. FlagShareReadOnly = 1 << 1
  42. FlagShareBits = 0x000000ff
  43. )
  44. var (
  45. ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
  46. ErrClosed = errors.New("connection closed")
  47. )
  48. type Model interface {
  49. // An index was received from the peer node
  50. Index(nodeID NodeID, repo string, files []FileInfo)
  51. // An index update was received from the peer node
  52. IndexUpdate(nodeID NodeID, repo string, files []FileInfo)
  53. // A request was made by the peer node
  54. Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error)
  55. // A cluster configuration message was received
  56. ClusterConfig(nodeID NodeID, config ClusterConfigMessage)
  57. // The peer node closed the connection
  58. Close(nodeID NodeID, err error)
  59. }
  60. type Connection interface {
  61. ID() NodeID
  62. Name() string
  63. Index(repo string, files []FileInfo) error
  64. IndexUpdate(repo string, files []FileInfo) error
  65. Request(repo string, name string, offset int64, size int) ([]byte, error)
  66. ClusterConfig(config ClusterConfigMessage)
  67. Statistics() Statistics
  68. }
  69. type rawConnection struct {
  70. id NodeID
  71. name string
  72. receiver Model
  73. state int
  74. cr *countingReader
  75. cw *countingWriter
  76. awaiting [4096]chan asyncResult
  77. awaitingMut sync.Mutex
  78. idxMut sync.Mutex // ensures serialization of Index calls
  79. nextID chan int
  80. outbox chan hdrMsg
  81. closed chan struct{}
  82. once sync.Once
  83. compressionThreshold int // compress messages larger than this many bytes
  84. rdbuf0 []byte // used & reused by readMessage
  85. rdbuf1 []byte // used & reused by readMessage
  86. }
  87. type asyncResult struct {
  88. val []byte
  89. err error
  90. }
  91. type hdrMsg struct {
  92. hdr header
  93. msg encodable
  94. }
  95. type encodable interface {
  96. AppendXDR([]byte) []byte
  97. }
  98. const (
  99. pingTimeout = 30 * time.Second
  100. pingIdleTime = 60 * time.Second
  101. )
  102. func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string, compress bool) Connection {
  103. cr := &countingReader{Reader: reader}
  104. cw := &countingWriter{Writer: writer}
  105. compThres := 1<<31 - 1 // compression disabled
  106. if compress {
  107. compThres = 128 // compress messages that are 128 bytes long or larger
  108. }
  109. c := rawConnection{
  110. id: nodeID,
  111. name: name,
  112. receiver: nativeModel{receiver},
  113. state: stateInitial,
  114. cr: cr,
  115. cw: cw,
  116. outbox: make(chan hdrMsg),
  117. nextID: make(chan int),
  118. closed: make(chan struct{}),
  119. compressionThreshold: compThres,
  120. }
  121. go c.readerLoop()
  122. go c.writerLoop()
  123. go c.pingerLoop()
  124. go c.idGenerator()
  125. return wireFormatConnection{&c}
  126. }
  127. func (c *rawConnection) ID() NodeID {
  128. return c.id
  129. }
  130. func (c *rawConnection) Name() string {
  131. return c.name
  132. }
  133. // Index writes the list of file information to the connected peer node
  134. func (c *rawConnection) Index(repo string, idx []FileInfo) error {
  135. select {
  136. case <-c.closed:
  137. return ErrClosed
  138. default:
  139. }
  140. c.idxMut.Lock()
  141. c.send(-1, messageTypeIndex, IndexMessage{repo, idx})
  142. c.idxMut.Unlock()
  143. return nil
  144. }
  145. // IndexUpdate writes the list of file information to the connected peer node as an update
  146. func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error {
  147. select {
  148. case <-c.closed:
  149. return ErrClosed
  150. default:
  151. }
  152. c.idxMut.Lock()
  153. c.send(-1, messageTypeIndexUpdate, IndexMessage{repo, idx})
  154. c.idxMut.Unlock()
  155. return nil
  156. }
  157. // Request returns the bytes for the specified block after fetching them from the connected peer.
  158. func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
  159. var id int
  160. select {
  161. case id = <-c.nextID:
  162. case <-c.closed:
  163. return nil, ErrClosed
  164. }
  165. c.awaitingMut.Lock()
  166. if ch := c.awaiting[id]; ch != nil {
  167. panic("id taken")
  168. }
  169. rc := make(chan asyncResult, 1)
  170. c.awaiting[id] = rc
  171. c.awaitingMut.Unlock()
  172. ok := c.send(id, messageTypeRequest, RequestMessage{repo, name, uint64(offset), uint32(size)})
  173. if !ok {
  174. return nil, ErrClosed
  175. }
  176. res, ok := <-rc
  177. if !ok {
  178. return nil, ErrClosed
  179. }
  180. return res.val, res.err
  181. }
  182. // ClusterConfig send the cluster configuration message to the peer and returns any error
  183. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  184. c.send(-1, messageTypeClusterConfig, config)
  185. }
  186. func (c *rawConnection) ping() bool {
  187. var id int
  188. select {
  189. case id = <-c.nextID:
  190. case <-c.closed:
  191. return false
  192. }
  193. rc := make(chan asyncResult, 1)
  194. c.awaitingMut.Lock()
  195. c.awaiting[id] = rc
  196. c.awaitingMut.Unlock()
  197. ok := c.send(id, messageTypePing, nil)
  198. if !ok {
  199. return false
  200. }
  201. res, ok := <-rc
  202. return ok && res.err == nil
  203. }
  204. func (c *rawConnection) readerLoop() (err error) {
  205. defer func() {
  206. c.close(err)
  207. }()
  208. for {
  209. select {
  210. case <-c.closed:
  211. return ErrClosed
  212. default:
  213. }
  214. hdr, msg, err := c.readMessage()
  215. if err != nil {
  216. return err
  217. }
  218. switch hdr.msgType {
  219. case messageTypeIndex:
  220. if c.state < stateCCRcvd {
  221. return fmt.Errorf("protocol error: index message in state %d", c.state)
  222. }
  223. c.handleIndex(msg.(IndexMessage))
  224. c.state = stateIdxRcvd
  225. case messageTypeIndexUpdate:
  226. if c.state < stateIdxRcvd {
  227. return fmt.Errorf("protocol error: index update message in state %d", c.state)
  228. }
  229. c.handleIndexUpdate(msg.(IndexMessage))
  230. case messageTypeRequest:
  231. if c.state < stateIdxRcvd {
  232. return fmt.Errorf("protocol error: request message in state %d", c.state)
  233. }
  234. // Requests are handled asynchronously
  235. go c.handleRequest(hdr.msgID, msg.(RequestMessage))
  236. case messageTypeResponse:
  237. if c.state < stateIdxRcvd {
  238. return fmt.Errorf("protocol error: response message in state %d", c.state)
  239. }
  240. c.handleResponse(hdr.msgID, msg.(ResponseMessage))
  241. case messageTypePing:
  242. c.send(hdr.msgID, messageTypePong, EmptyMessage{})
  243. case messageTypePong:
  244. c.handlePong(hdr.msgID)
  245. case messageTypeClusterConfig:
  246. if c.state != stateInitial {
  247. return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
  248. }
  249. go c.receiver.ClusterConfig(c.id, msg.(ClusterConfigMessage))
  250. c.state = stateCCRcvd
  251. case messageTypeClose:
  252. return errors.New(msg.(CloseMessage).Reason)
  253. default:
  254. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  255. }
  256. }
  257. }
  258. func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
  259. if cap(c.rdbuf0) < 8 {
  260. c.rdbuf0 = make([]byte, 8)
  261. } else {
  262. c.rdbuf0 = c.rdbuf0[:8]
  263. }
  264. _, err = io.ReadFull(c.cr, c.rdbuf0)
  265. if err != nil {
  266. return
  267. }
  268. hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4]))
  269. msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8]))
  270. if debug {
  271. l.Debugf("read header %v (msglen=%d)", hdr, msglen)
  272. }
  273. if cap(c.rdbuf0) < msglen {
  274. c.rdbuf0 = make([]byte, msglen)
  275. } else {
  276. c.rdbuf0 = c.rdbuf0[:msglen]
  277. }
  278. _, err = io.ReadFull(c.cr, c.rdbuf0)
  279. if err != nil {
  280. return
  281. }
  282. if debug {
  283. l.Debugf("read %d bytes", len(c.rdbuf0))
  284. }
  285. msgBuf := c.rdbuf0
  286. if hdr.compression {
  287. c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)]
  288. c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0)
  289. if err != nil {
  290. return
  291. }
  292. msgBuf = c.rdbuf1
  293. if debug {
  294. l.Debugf("decompressed to %d bytes", len(msgBuf))
  295. }
  296. }
  297. if debug {
  298. if len(msgBuf) > 1024 {
  299. l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
  300. } else {
  301. l.Debugf("message data:\n%s", hex.Dump(msgBuf))
  302. }
  303. }
  304. switch hdr.msgType {
  305. case messageTypeIndex, messageTypeIndexUpdate:
  306. var idx IndexMessage
  307. err = idx.UnmarshalXDR(msgBuf)
  308. msg = idx
  309. case messageTypeRequest:
  310. var req RequestMessage
  311. err = req.UnmarshalXDR(msgBuf)
  312. msg = req
  313. case messageTypeResponse:
  314. var resp ResponseMessage
  315. err = resp.UnmarshalXDR(msgBuf)
  316. msg = resp
  317. case messageTypePing, messageTypePong:
  318. msg = EmptyMessage{}
  319. case messageTypeClusterConfig:
  320. var cc ClusterConfigMessage
  321. err = cc.UnmarshalXDR(msgBuf)
  322. msg = cc
  323. case messageTypeClose:
  324. var cm CloseMessage
  325. err = cm.UnmarshalXDR(msgBuf)
  326. msg = cm
  327. default:
  328. err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  329. }
  330. return
  331. }
  332. func (c *rawConnection) handleIndex(im IndexMessage) {
  333. if debug {
  334. l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  335. }
  336. c.receiver.Index(c.id, im.Repository, im.Files)
  337. }
  338. func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
  339. if debug {
  340. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  341. }
  342. c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
  343. }
  344. func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
  345. data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
  346. c.send(msgID, messageTypeResponse, ResponseMessage{data})
  347. }
  348. func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
  349. c.awaitingMut.Lock()
  350. if rc := c.awaiting[msgID]; rc != nil {
  351. c.awaiting[msgID] = nil
  352. rc <- asyncResult{resp.Data, nil}
  353. close(rc)
  354. }
  355. c.awaitingMut.Unlock()
  356. }
  357. func (c *rawConnection) handlePong(msgID int) {
  358. c.awaitingMut.Lock()
  359. if rc := c.awaiting[msgID]; rc != nil {
  360. c.awaiting[msgID] = nil
  361. rc <- asyncResult{}
  362. close(rc)
  363. }
  364. c.awaitingMut.Unlock()
  365. }
  366. func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool {
  367. if msgID < 0 {
  368. select {
  369. case id := <-c.nextID:
  370. msgID = id
  371. case <-c.closed:
  372. return false
  373. }
  374. }
  375. hdr := header{
  376. version: 0,
  377. msgID: msgID,
  378. msgType: msgType,
  379. }
  380. select {
  381. case c.outbox <- hdrMsg{hdr, msg}:
  382. return true
  383. case <-c.closed:
  384. return false
  385. }
  386. }
  387. func (c *rawConnection) writerLoop() {
  388. var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
  389. var uncBuf []byte // buffer for uncompressed message, kept and reused
  390. for {
  391. var tempBuf []byte
  392. var err error
  393. select {
  394. case hm := <-c.outbox:
  395. if hm.msg != nil {
  396. // Uncompressed message in uncBuf
  397. uncBuf = hm.msg.AppendXDR(uncBuf[:0])
  398. if len(uncBuf) >= c.compressionThreshold {
  399. // Use compression for large messages
  400. hm.hdr.compression = true
  401. // Make sure we have enough space for the compressed message plus header in msgBug
  402. msgBuf = msgBuf[:cap(msgBuf)]
  403. if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
  404. msgBuf = make([]byte, maxLen)
  405. }
  406. // Compressed is written to msgBuf, we keep tb for the length only
  407. tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
  408. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
  409. msgBuf = msgBuf[0 : len(tempBuf)+8]
  410. if debug {
  411. l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
  412. }
  413. } else {
  414. // No point in compressing very short messages
  415. hm.hdr.compression = false
  416. msgBuf = msgBuf[:cap(msgBuf)]
  417. if l := len(uncBuf) + 8; l > len(msgBuf) {
  418. msgBuf = make([]byte, l)
  419. }
  420. binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
  421. msgBuf = msgBuf[0 : len(uncBuf)+8]
  422. copy(msgBuf[8:], uncBuf)
  423. if debug {
  424. l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
  425. }
  426. }
  427. } else {
  428. if debug {
  429. l.Debugf("write empty message; %v", hm.hdr)
  430. }
  431. binary.BigEndian.PutUint32(msgBuf[4:8], 0)
  432. msgBuf = msgBuf[:8]
  433. }
  434. binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
  435. if err == nil {
  436. var n int
  437. n, err = c.cw.Write(msgBuf)
  438. if debug {
  439. l.Debugf("wrote %d bytes on the wire", n)
  440. }
  441. }
  442. if err != nil {
  443. c.close(err)
  444. return
  445. }
  446. case <-c.closed:
  447. return
  448. }
  449. }
  450. }
  451. func (c *rawConnection) close(err error) {
  452. c.once.Do(func() {
  453. close(c.closed)
  454. c.awaitingMut.Lock()
  455. for i, ch := range c.awaiting {
  456. if ch != nil {
  457. close(ch)
  458. c.awaiting[i] = nil
  459. }
  460. }
  461. c.awaitingMut.Unlock()
  462. go c.receiver.Close(c.id, err)
  463. })
  464. }
  465. func (c *rawConnection) idGenerator() {
  466. nextID := 0
  467. for {
  468. nextID = (nextID + 1) & 0xfff
  469. select {
  470. case c.nextID <- nextID:
  471. case <-c.closed:
  472. return
  473. }
  474. }
  475. }
  476. func (c *rawConnection) pingerLoop() {
  477. var rc = make(chan bool, 1)
  478. ticker := time.Tick(pingIdleTime / 2)
  479. for {
  480. select {
  481. case <-ticker:
  482. if d := time.Since(c.cr.Last()); d < pingIdleTime {
  483. if debug {
  484. l.Debugln(c.id, "ping skipped after rd", d)
  485. }
  486. continue
  487. }
  488. if d := time.Since(c.cw.Last()); d < pingIdleTime {
  489. if debug {
  490. l.Debugln(c.id, "ping skipped after wr", d)
  491. }
  492. continue
  493. }
  494. go func() {
  495. if debug {
  496. l.Debugln(c.id, "ping ->")
  497. }
  498. rc <- c.ping()
  499. }()
  500. select {
  501. case ok := <-rc:
  502. if debug {
  503. l.Debugln(c.id, "<- pong")
  504. }
  505. if !ok {
  506. c.close(fmt.Errorf("ping failure"))
  507. }
  508. case <-time.After(pingTimeout):
  509. c.close(fmt.Errorf("ping timeout"))
  510. case <-c.closed:
  511. return
  512. }
  513. case <-c.closed:
  514. return
  515. }
  516. }
  517. }
  518. type Statistics struct {
  519. At time.Time
  520. InBytesTotal uint64
  521. OutBytesTotal uint64
  522. }
  523. func (c *rawConnection) Statistics() Statistics {
  524. return Statistics{
  525. At: time.Now(),
  526. InBytesTotal: c.cr.Tot(),
  527. OutBytesTotal: c.cw.Tot(),
  528. }
  529. }
  530. func IsDeleted(bits uint32) bool {
  531. return bits&FlagDeleted != 0
  532. }
  533. func IsInvalid(bits uint32) bool {
  534. return bits&FlagInvalid != 0
  535. }
  536. func IsDirectory(bits uint32) bool {
  537. return bits&FlagDirectory != 0
  538. }
  539. func HasPermissionBits(bits uint32) bool {
  540. return bits&FlagNoPermBits == 0
  541. }