protocol.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package protocol
  5. import (
  6. "bufio"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "sync"
  11. "time"
  12. "github.com/calmh/syncthing/xdr"
  13. )
  14. const BlockSize = 128 * 1024
  15. const (
  16. messageTypeClusterConfig = 0
  17. messageTypeIndex = 1
  18. messageTypeRequest = 2
  19. messageTypeResponse = 3
  20. messageTypePing = 4
  21. messageTypePong = 5
  22. messageTypeIndexUpdate = 6
  23. messageTypeClose = 7
  24. )
  25. const (
  26. stateInitial = iota
  27. stateCCRcvd
  28. stateIdxRcvd
  29. )
  30. const (
  31. FlagDeleted uint32 = 1 << 12
  32. FlagInvalid = 1 << 13
  33. FlagDirectory = 1 << 14
  34. FlagNoPermBits = 1 << 15
  35. )
  36. const (
  37. FlagShareTrusted uint32 = 1 << 0
  38. FlagShareReadOnly = 1 << 1
  39. FlagShareBits = 0x000000ff
  40. )
  41. var (
  42. ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
  43. ErrClosed = errors.New("connection closed")
  44. )
  45. type Model interface {
  46. // An index was received from the peer node
  47. Index(nodeID NodeID, repo string, files []FileInfo)
  48. // An index update was received from the peer node
  49. IndexUpdate(nodeID NodeID, repo string, files []FileInfo)
  50. // A request was made by the peer node
  51. Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error)
  52. // A cluster configuration message was received
  53. ClusterConfig(nodeID NodeID, config ClusterConfigMessage)
  54. // The peer node closed the connection
  55. Close(nodeID NodeID, err error)
  56. }
  57. type Connection interface {
  58. ID() NodeID
  59. Name() string
  60. Index(repo string, files []FileInfo) error
  61. IndexUpdate(repo string, files []FileInfo) error
  62. Request(repo string, name string, offset int64, size int) ([]byte, error)
  63. ClusterConfig(config ClusterConfigMessage)
  64. Statistics() Statistics
  65. }
  66. type rawConnection struct {
  67. id NodeID
  68. name string
  69. receiver Model
  70. state int
  71. cr *countingReader
  72. xr *xdr.Reader
  73. cw *countingWriter
  74. wb *bufio.Writer
  75. xw *xdr.Writer
  76. awaiting []chan asyncResult
  77. awaitingMut sync.Mutex
  78. idxMut sync.Mutex // ensures serialization of Index calls
  79. nextID chan int
  80. outbox chan []encodable
  81. closed chan struct{}
  82. once sync.Once
  83. }
  84. type asyncResult struct {
  85. val []byte
  86. err error
  87. }
  88. const (
  89. pingTimeout = 30 * time.Second
  90. pingIdleTime = 60 * time.Second
  91. )
  92. func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string) Connection {
  93. // Byte counters are at the lowest level, counting compressed bytes
  94. cr := &countingReader{Reader: reader}
  95. cw := &countingWriter{Writer: writer}
  96. // Compression is just above counting
  97. zr := newLZ4Reader(cr)
  98. zw := newLZ4Writer(cw)
  99. // We buffer writes on top of compression.
  100. // The LZ4 reader is already internally buffered
  101. wb := bufio.NewWriterSize(zw, 65536)
  102. c := rawConnection{
  103. id: nodeID,
  104. name: name,
  105. receiver: nativeModel{receiver},
  106. state: stateInitial,
  107. cr: cr,
  108. xr: xdr.NewReader(zr),
  109. cw: cw,
  110. wb: wb,
  111. xw: xdr.NewWriter(wb),
  112. awaiting: make([]chan asyncResult, 0x1000),
  113. outbox: make(chan []encodable),
  114. nextID: make(chan int),
  115. closed: make(chan struct{}),
  116. }
  117. go c.readerLoop()
  118. go c.writerLoop()
  119. go c.pingerLoop()
  120. go c.idGenerator()
  121. return wireFormatConnection{&c}
  122. }
  123. func (c *rawConnection) ID() NodeID {
  124. return c.id
  125. }
  126. func (c *rawConnection) Name() string {
  127. return c.name
  128. }
  129. // Index writes the list of file information to the connected peer node
  130. func (c *rawConnection) Index(repo string, idx []FileInfo) error {
  131. select {
  132. case <-c.closed:
  133. return ErrClosed
  134. default:
  135. }
  136. c.idxMut.Lock()
  137. c.send(header{0, -1, messageTypeIndex}, IndexMessage{repo, idx})
  138. c.idxMut.Unlock()
  139. return nil
  140. }
  141. // IndexUpdate writes the list of file information to the connected peer node as an update
  142. func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error {
  143. select {
  144. case <-c.closed:
  145. return ErrClosed
  146. default:
  147. }
  148. c.idxMut.Lock()
  149. c.send(header{0, -1, messageTypeIndexUpdate}, IndexMessage{repo, idx})
  150. c.idxMut.Unlock()
  151. return nil
  152. }
  153. // Request returns the bytes for the specified block after fetching them from the connected peer.
  154. func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
  155. var id int
  156. select {
  157. case id = <-c.nextID:
  158. case <-c.closed:
  159. return nil, ErrClosed
  160. }
  161. c.awaitingMut.Lock()
  162. if ch := c.awaiting[id]; ch != nil {
  163. panic("id taken")
  164. }
  165. rc := make(chan asyncResult, 1)
  166. c.awaiting[id] = rc
  167. c.awaitingMut.Unlock()
  168. ok := c.send(header{0, id, messageTypeRequest},
  169. RequestMessage{repo, name, uint64(offset), uint32(size)})
  170. if !ok {
  171. return nil, ErrClosed
  172. }
  173. res, ok := <-rc
  174. if !ok {
  175. return nil, ErrClosed
  176. }
  177. return res.val, res.err
  178. }
  179. // ClusterConfig send the cluster configuration message to the peer and returns any error
  180. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  181. c.send(header{0, -1, messageTypeClusterConfig}, config)
  182. }
  183. func (c *rawConnection) ping() bool {
  184. var id int
  185. select {
  186. case id = <-c.nextID:
  187. case <-c.closed:
  188. return false
  189. }
  190. rc := make(chan asyncResult, 1)
  191. c.awaitingMut.Lock()
  192. c.awaiting[id] = rc
  193. c.awaitingMut.Unlock()
  194. ok := c.send(header{0, id, messageTypePing})
  195. if !ok {
  196. return false
  197. }
  198. res, ok := <-rc
  199. return ok && res.err == nil
  200. }
  201. func (c *rawConnection) readerLoop() (err error) {
  202. defer func() {
  203. c.close(err)
  204. }()
  205. for {
  206. select {
  207. case <-c.closed:
  208. return ErrClosed
  209. default:
  210. }
  211. var hdr header
  212. hdr.decodeXDR(c.xr)
  213. if err := c.xr.Error(); err != nil {
  214. return err
  215. }
  216. if hdr.version != 0 {
  217. return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
  218. }
  219. switch hdr.msgType {
  220. case messageTypeIndex:
  221. if c.state < stateCCRcvd {
  222. return fmt.Errorf("protocol error: index message in state %d", c.state)
  223. }
  224. if err := c.handleIndex(); err != nil {
  225. return err
  226. }
  227. c.state = stateIdxRcvd
  228. case messageTypeIndexUpdate:
  229. if c.state < stateIdxRcvd {
  230. return fmt.Errorf("protocol error: index update message in state %d", c.state)
  231. }
  232. if err := c.handleIndexUpdate(); err != nil {
  233. return err
  234. }
  235. case messageTypeRequest:
  236. if c.state < stateIdxRcvd {
  237. return fmt.Errorf("protocol error: request message in state %d", c.state)
  238. }
  239. if err := c.handleRequest(hdr); err != nil {
  240. return err
  241. }
  242. case messageTypeResponse:
  243. if c.state < stateIdxRcvd {
  244. return fmt.Errorf("protocol error: response message in state %d", c.state)
  245. }
  246. if err := c.handleResponse(hdr); err != nil {
  247. return err
  248. }
  249. case messageTypePing:
  250. c.send(header{0, hdr.msgID, messageTypePong})
  251. case messageTypePong:
  252. c.handlePong(hdr)
  253. case messageTypeClusterConfig:
  254. if c.state != stateInitial {
  255. return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
  256. }
  257. if err := c.handleClusterConfig(); err != nil {
  258. return err
  259. }
  260. c.state = stateCCRcvd
  261. case messageTypeClose:
  262. if err := c.handleClose(); err != nil {
  263. return err
  264. }
  265. default:
  266. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  267. }
  268. }
  269. }
  270. func (c *rawConnection) handleIndex() error {
  271. var im IndexMessage
  272. im.decodeXDR(c.xr)
  273. if err := c.xr.Error(); err != nil {
  274. return err
  275. } else {
  276. if debug {
  277. l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  278. }
  279. c.receiver.Index(c.id, im.Repository, im.Files)
  280. }
  281. return nil
  282. }
  283. func (c *rawConnection) handleIndexUpdate() error {
  284. var im IndexMessage
  285. im.decodeXDR(c.xr)
  286. if err := c.xr.Error(); err != nil {
  287. return err
  288. } else {
  289. if debug {
  290. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  291. }
  292. c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
  293. }
  294. return nil
  295. }
  296. func (c *rawConnection) handleRequest(hdr header) error {
  297. var req RequestMessage
  298. req.decodeXDR(c.xr)
  299. if err := c.xr.Error(); err != nil {
  300. return err
  301. }
  302. go c.processRequest(hdr.msgID, req)
  303. return nil
  304. }
  305. func (c *rawConnection) handleResponse(hdr header) error {
  306. data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
  307. if err := c.xr.Error(); err != nil {
  308. return err
  309. }
  310. c.awaitingMut.Lock()
  311. if rc := c.awaiting[hdr.msgID]; rc != nil {
  312. c.awaiting[hdr.msgID] = nil
  313. rc <- asyncResult{data, nil}
  314. close(rc)
  315. }
  316. c.awaitingMut.Unlock()
  317. return nil
  318. }
  319. func (c *rawConnection) handlePong(hdr header) {
  320. c.awaitingMut.Lock()
  321. if rc := c.awaiting[hdr.msgID]; rc != nil {
  322. c.awaiting[hdr.msgID] = nil
  323. rc <- asyncResult{}
  324. close(rc)
  325. }
  326. c.awaitingMut.Unlock()
  327. }
  328. func (c *rawConnection) handleClusterConfig() error {
  329. var cm ClusterConfigMessage
  330. cm.decodeXDR(c.xr)
  331. if err := c.xr.Error(); err != nil {
  332. return err
  333. } else {
  334. go c.receiver.ClusterConfig(c.id, cm)
  335. }
  336. return nil
  337. }
  338. func (c *rawConnection) handleClose() error {
  339. var cm CloseMessage
  340. cm.decodeXDR(c.xr)
  341. if err := c.xr.Error(); err != nil {
  342. return err
  343. }
  344. return errors.New(cm.Reason)
  345. }
  346. type encodable interface {
  347. encodeXDR(*xdr.Writer) (int, error)
  348. }
  349. type encodableBytes []byte
  350. func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
  351. return xw.WriteBytes(e)
  352. }
  353. func (c *rawConnection) send(h header, es ...encodable) bool {
  354. if h.msgID < 0 {
  355. select {
  356. case id := <-c.nextID:
  357. h.msgID = id
  358. case <-c.closed:
  359. return false
  360. }
  361. }
  362. msg := append([]encodable{h}, es...)
  363. select {
  364. case c.outbox <- msg:
  365. return true
  366. case <-c.closed:
  367. return false
  368. }
  369. }
  370. func (c *rawConnection) writerLoop() {
  371. for {
  372. select {
  373. case es := <-c.outbox:
  374. for _, e := range es {
  375. e.encodeXDR(c.xw)
  376. }
  377. if err := c.flush(); err != nil {
  378. c.close(err)
  379. return
  380. }
  381. case <-c.closed:
  382. return
  383. }
  384. }
  385. }
  386. func (c *rawConnection) flush() error {
  387. if err := c.xw.Error(); err != nil {
  388. return err
  389. }
  390. if err := c.wb.Flush(); err != nil {
  391. return err
  392. }
  393. return nil
  394. }
  395. func (c *rawConnection) close(err error) {
  396. c.once.Do(func() {
  397. close(c.closed)
  398. c.awaitingMut.Lock()
  399. for i, ch := range c.awaiting {
  400. if ch != nil {
  401. close(ch)
  402. c.awaiting[i] = nil
  403. }
  404. }
  405. c.awaitingMut.Unlock()
  406. go c.receiver.Close(c.id, err)
  407. })
  408. }
  409. func (c *rawConnection) idGenerator() {
  410. nextID := 0
  411. for {
  412. nextID = (nextID + 1) & 0xfff
  413. select {
  414. case c.nextID <- nextID:
  415. case <-c.closed:
  416. return
  417. }
  418. }
  419. }
  420. func (c *rawConnection) pingerLoop() {
  421. var rc = make(chan bool, 1)
  422. ticker := time.Tick(pingIdleTime / 2)
  423. for {
  424. select {
  425. case <-ticker:
  426. if d := time.Since(c.xr.LastRead()); d < pingIdleTime {
  427. if debug {
  428. l.Debugln(c.id, "ping skipped after rd", d)
  429. }
  430. continue
  431. }
  432. if d := time.Since(c.xw.LastWrite()); d < pingIdleTime {
  433. if debug {
  434. l.Debugln(c.id, "ping skipped after wr", d)
  435. }
  436. continue
  437. }
  438. go func() {
  439. if debug {
  440. l.Debugln(c.id, "ping ->")
  441. }
  442. rc <- c.ping()
  443. }()
  444. select {
  445. case ok := <-rc:
  446. if debug {
  447. l.Debugln(c.id, "<- pong")
  448. }
  449. if !ok {
  450. c.close(fmt.Errorf("ping failure"))
  451. }
  452. case <-time.After(pingTimeout):
  453. c.close(fmt.Errorf("ping timeout"))
  454. case <-c.closed:
  455. return
  456. }
  457. case <-c.closed:
  458. return
  459. }
  460. }
  461. }
  462. func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
  463. data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
  464. c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
  465. }
  466. type Statistics struct {
  467. At time.Time
  468. InBytesTotal uint64
  469. OutBytesTotal uint64
  470. }
  471. func (c *rawConnection) Statistics() Statistics {
  472. return Statistics{
  473. At: time.Now(),
  474. InBytesTotal: c.cr.Tot(),
  475. OutBytesTotal: c.cw.Tot(),
  476. }
  477. }
  478. func IsDeleted(bits uint32) bool {
  479. return bits&FlagDeleted != 0
  480. }
  481. func IsInvalid(bits uint32) bool {
  482. return bits&FlagInvalid != 0
  483. }
  484. func IsDirectory(bits uint32) bool {
  485. return bits&FlagDirectory != 0
  486. }
  487. func HasPermissionBits(bits uint32) bool {
  488. return bits&FlagNoPermBits == 0
  489. }