protocol.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package protocol
  5. import (
  6. "bufio"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "sync"
  11. "time"
  12. "github.com/calmh/syncthing/xdr"
  13. )
  14. const BlockSize = 128 * 1024
  15. const (
  16. messageTypeClusterConfig = 0
  17. messageTypeIndex = 1
  18. messageTypeRequest = 2
  19. messageTypeResponse = 3
  20. messageTypePing = 4
  21. messageTypePong = 5
  22. messageTypeIndexUpdate = 6
  23. )
  24. const (
  25. stateInitial = iota
  26. stateCCRcvd
  27. stateIdxRcvd
  28. )
  29. const (
  30. FlagDeleted uint32 = 1 << 12
  31. FlagInvalid = 1 << 13
  32. FlagDirectory = 1 << 14
  33. FlagNoPermBits = 1 << 15
  34. )
  35. const (
  36. FlagShareTrusted uint32 = 1 << 0
  37. FlagShareReadOnly = 1 << 1
  38. FlagShareBits = 0x000000ff
  39. )
  40. var (
  41. ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
  42. ErrClosed = errors.New("connection closed")
  43. )
  44. type Model interface {
  45. // An index was received from the peer node
  46. Index(nodeID NodeID, repo string, files []FileInfo)
  47. // An index update was received from the peer node
  48. IndexUpdate(nodeID NodeID, repo string, files []FileInfo)
  49. // A request was made by the peer node
  50. Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error)
  51. // A cluster configuration message was received
  52. ClusterConfig(nodeID NodeID, config ClusterConfigMessage)
  53. // The peer node closed the connection
  54. Close(nodeID NodeID, err error)
  55. }
  56. type Connection interface {
  57. ID() NodeID
  58. Name() string
  59. Index(repo string, files []FileInfo) error
  60. IndexUpdate(repo string, files []FileInfo) error
  61. Request(repo string, name string, offset int64, size int) ([]byte, error)
  62. ClusterConfig(config ClusterConfigMessage)
  63. Statistics() Statistics
  64. }
  65. type rawConnection struct {
  66. id NodeID
  67. name string
  68. receiver Model
  69. state int
  70. cr *countingReader
  71. xr *xdr.Reader
  72. cw *countingWriter
  73. wb *bufio.Writer
  74. xw *xdr.Writer
  75. awaiting []chan asyncResult
  76. awaitingMut sync.Mutex
  77. idxMut sync.Mutex // ensures serialization of Index calls
  78. nextID chan int
  79. outbox chan []encodable
  80. closed chan struct{}
  81. once sync.Once
  82. }
  83. type asyncResult struct {
  84. val []byte
  85. err error
  86. }
  87. const (
  88. pingTimeout = 30 * time.Second
  89. pingIdleTime = 60 * time.Second
  90. )
  91. func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string) Connection {
  92. // Byte counters are at the lowest level, counting compressed bytes
  93. cr := &countingReader{Reader: reader}
  94. cw := &countingWriter{Writer: writer}
  95. // Compression is just above counting
  96. zr := newLZ4Reader(cr)
  97. zw := newLZ4Writer(cw)
  98. // We buffer writes on top of compression.
  99. // The LZ4 reader is already internally buffered
  100. wb := bufio.NewWriterSize(zw, 65536)
  101. c := rawConnection{
  102. id: nodeID,
  103. name: name,
  104. receiver: nativeModel{receiver},
  105. state: stateInitial,
  106. cr: cr,
  107. xr: xdr.NewReader(zr),
  108. cw: cw,
  109. wb: wb,
  110. xw: xdr.NewWriter(wb),
  111. awaiting: make([]chan asyncResult, 0x1000),
  112. outbox: make(chan []encodable),
  113. nextID: make(chan int),
  114. closed: make(chan struct{}),
  115. }
  116. go c.readerLoop()
  117. go c.writerLoop()
  118. go c.pingerLoop()
  119. go c.idGenerator()
  120. return wireFormatConnection{&c}
  121. }
  122. func (c *rawConnection) ID() NodeID {
  123. return c.id
  124. }
  125. func (c *rawConnection) Name() string {
  126. return c.name
  127. }
  128. // Index writes the list of file information to the connected peer node
  129. func (c *rawConnection) Index(repo string, idx []FileInfo) error {
  130. select {
  131. case <-c.closed:
  132. return ErrClosed
  133. default:
  134. }
  135. c.idxMut.Lock()
  136. c.send(header{0, -1, messageTypeIndex}, IndexMessage{repo, idx})
  137. c.idxMut.Unlock()
  138. return nil
  139. }
  140. // IndexUpdate writes the list of file information to the connected peer node as an update
  141. func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error {
  142. select {
  143. case <-c.closed:
  144. return ErrClosed
  145. default:
  146. }
  147. c.idxMut.Lock()
  148. c.send(header{0, -1, messageTypeIndexUpdate}, IndexMessage{repo, idx})
  149. c.idxMut.Unlock()
  150. return nil
  151. }
  152. // Request returns the bytes for the specified block after fetching them from the connected peer.
  153. func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
  154. var id int
  155. select {
  156. case id = <-c.nextID:
  157. case <-c.closed:
  158. return nil, ErrClosed
  159. }
  160. c.awaitingMut.Lock()
  161. if ch := c.awaiting[id]; ch != nil {
  162. panic("id taken")
  163. }
  164. rc := make(chan asyncResult, 1)
  165. c.awaiting[id] = rc
  166. c.awaitingMut.Unlock()
  167. ok := c.send(header{0, id, messageTypeRequest},
  168. RequestMessage{repo, name, uint64(offset), uint32(size)})
  169. if !ok {
  170. return nil, ErrClosed
  171. }
  172. res, ok := <-rc
  173. if !ok {
  174. return nil, ErrClosed
  175. }
  176. return res.val, res.err
  177. }
  178. // ClusterConfig send the cluster configuration message to the peer and returns any error
  179. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  180. c.send(header{0, -1, messageTypeClusterConfig}, config)
  181. }
  182. func (c *rawConnection) ping() bool {
  183. var id int
  184. select {
  185. case id = <-c.nextID:
  186. case <-c.closed:
  187. return false
  188. }
  189. rc := make(chan asyncResult, 1)
  190. c.awaitingMut.Lock()
  191. c.awaiting[id] = rc
  192. c.awaitingMut.Unlock()
  193. ok := c.send(header{0, id, messageTypePing})
  194. if !ok {
  195. return false
  196. }
  197. res, ok := <-rc
  198. return ok && res.err == nil
  199. }
  200. func (c *rawConnection) readerLoop() (err error) {
  201. defer func() {
  202. c.close(err)
  203. }()
  204. for {
  205. select {
  206. case <-c.closed:
  207. return ErrClosed
  208. default:
  209. }
  210. var hdr header
  211. hdr.decodeXDR(c.xr)
  212. if err := c.xr.Error(); err != nil {
  213. return err
  214. }
  215. if hdr.version != 0 {
  216. return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
  217. }
  218. switch hdr.msgType {
  219. case messageTypeIndex:
  220. if c.state < stateCCRcvd {
  221. return fmt.Errorf("protocol error: index message in state %d", c.state)
  222. }
  223. if err := c.handleIndex(); err != nil {
  224. return err
  225. }
  226. c.state = stateIdxRcvd
  227. case messageTypeIndexUpdate:
  228. if c.state < stateIdxRcvd {
  229. return fmt.Errorf("protocol error: index update message in state %d", c.state)
  230. }
  231. if err := c.handleIndexUpdate(); err != nil {
  232. return err
  233. }
  234. case messageTypeRequest:
  235. if c.state < stateIdxRcvd {
  236. return fmt.Errorf("protocol error: request message in state %d", c.state)
  237. }
  238. if err := c.handleRequest(hdr); err != nil {
  239. return err
  240. }
  241. case messageTypeResponse:
  242. if c.state < stateIdxRcvd {
  243. return fmt.Errorf("protocol error: response message in state %d", c.state)
  244. }
  245. if err := c.handleResponse(hdr); err != nil {
  246. return err
  247. }
  248. case messageTypePing:
  249. c.send(header{0, hdr.msgID, messageTypePong})
  250. case messageTypePong:
  251. c.handlePong(hdr)
  252. case messageTypeClusterConfig:
  253. if c.state != stateInitial {
  254. return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
  255. }
  256. if err := c.handleClusterConfig(); err != nil {
  257. return err
  258. }
  259. c.state = stateCCRcvd
  260. default:
  261. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  262. }
  263. }
  264. }
  265. func (c *rawConnection) handleIndex() error {
  266. var im IndexMessage
  267. im.decodeXDR(c.xr)
  268. if err := c.xr.Error(); err != nil {
  269. return err
  270. } else {
  271. if debug {
  272. l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  273. }
  274. c.receiver.Index(c.id, im.Repository, im.Files)
  275. }
  276. return nil
  277. }
  278. func (c *rawConnection) handleIndexUpdate() error {
  279. var im IndexMessage
  280. im.decodeXDR(c.xr)
  281. if err := c.xr.Error(); err != nil {
  282. return err
  283. } else {
  284. if debug {
  285. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files))
  286. }
  287. c.receiver.IndexUpdate(c.id, im.Repository, im.Files)
  288. }
  289. return nil
  290. }
  291. func (c *rawConnection) handleRequest(hdr header) error {
  292. var req RequestMessage
  293. req.decodeXDR(c.xr)
  294. if err := c.xr.Error(); err != nil {
  295. return err
  296. }
  297. go c.processRequest(hdr.msgID, req)
  298. return nil
  299. }
  300. func (c *rawConnection) handleResponse(hdr header) error {
  301. data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
  302. if err := c.xr.Error(); err != nil {
  303. return err
  304. }
  305. c.awaitingMut.Lock()
  306. if rc := c.awaiting[hdr.msgID]; rc != nil {
  307. c.awaiting[hdr.msgID] = nil
  308. rc <- asyncResult{data, nil}
  309. close(rc)
  310. }
  311. c.awaitingMut.Unlock()
  312. return nil
  313. }
  314. func (c *rawConnection) handlePong(hdr header) {
  315. c.awaitingMut.Lock()
  316. if rc := c.awaiting[hdr.msgID]; rc != nil {
  317. c.awaiting[hdr.msgID] = nil
  318. rc <- asyncResult{}
  319. close(rc)
  320. }
  321. c.awaitingMut.Unlock()
  322. }
  323. func (c *rawConnection) handleClusterConfig() error {
  324. var cm ClusterConfigMessage
  325. cm.decodeXDR(c.xr)
  326. if err := c.xr.Error(); err != nil {
  327. return err
  328. } else {
  329. go c.receiver.ClusterConfig(c.id, cm)
  330. }
  331. return nil
  332. }
  333. type encodable interface {
  334. encodeXDR(*xdr.Writer) (int, error)
  335. }
  336. type encodableBytes []byte
  337. func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
  338. return xw.WriteBytes(e)
  339. }
  340. func (c *rawConnection) send(h header, es ...encodable) bool {
  341. if h.msgID < 0 {
  342. select {
  343. case id := <-c.nextID:
  344. h.msgID = id
  345. case <-c.closed:
  346. return false
  347. }
  348. }
  349. msg := append([]encodable{h}, es...)
  350. select {
  351. case c.outbox <- msg:
  352. return true
  353. case <-c.closed:
  354. return false
  355. }
  356. }
  357. func (c *rawConnection) writerLoop() {
  358. for {
  359. select {
  360. case es := <-c.outbox:
  361. for _, e := range es {
  362. e.encodeXDR(c.xw)
  363. }
  364. if err := c.flush(); err != nil {
  365. c.close(err)
  366. return
  367. }
  368. case <-c.closed:
  369. return
  370. }
  371. }
  372. }
  373. func (c *rawConnection) flush() error {
  374. if err := c.xw.Error(); err != nil {
  375. return err
  376. }
  377. if err := c.wb.Flush(); err != nil {
  378. return err
  379. }
  380. return nil
  381. }
  382. func (c *rawConnection) close(err error) {
  383. c.once.Do(func() {
  384. close(c.closed)
  385. c.awaitingMut.Lock()
  386. for i, ch := range c.awaiting {
  387. if ch != nil {
  388. close(ch)
  389. c.awaiting[i] = nil
  390. }
  391. }
  392. c.awaitingMut.Unlock()
  393. go c.receiver.Close(c.id, err)
  394. })
  395. }
  396. func (c *rawConnection) idGenerator() {
  397. nextID := 0
  398. for {
  399. nextID = (nextID + 1) & 0xfff
  400. select {
  401. case c.nextID <- nextID:
  402. case <-c.closed:
  403. return
  404. }
  405. }
  406. }
  407. func (c *rawConnection) pingerLoop() {
  408. var rc = make(chan bool, 1)
  409. ticker := time.Tick(pingIdleTime / 2)
  410. for {
  411. select {
  412. case <-ticker:
  413. if d := time.Since(c.xr.LastRead()); d < pingIdleTime {
  414. if debug {
  415. l.Debugln(c.id, "ping skipped after rd", d)
  416. }
  417. continue
  418. }
  419. if d := time.Since(c.xw.LastWrite()); d < pingIdleTime {
  420. if debug {
  421. l.Debugln(c.id, "ping skipped after wr", d)
  422. }
  423. continue
  424. }
  425. go func() {
  426. if debug {
  427. l.Debugln(c.id, "ping ->")
  428. }
  429. rc <- c.ping()
  430. }()
  431. select {
  432. case ok := <-rc:
  433. if debug {
  434. l.Debugln(c.id, "<- pong")
  435. }
  436. if !ok {
  437. c.close(fmt.Errorf("ping failure"))
  438. }
  439. case <-time.After(pingTimeout):
  440. c.close(fmt.Errorf("ping timeout"))
  441. case <-c.closed:
  442. return
  443. }
  444. case <-c.closed:
  445. return
  446. }
  447. }
  448. }
  449. func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
  450. data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
  451. c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
  452. }
  453. type Statistics struct {
  454. At time.Time
  455. InBytesTotal uint64
  456. OutBytesTotal uint64
  457. }
  458. func (c *rawConnection) Statistics() Statistics {
  459. return Statistics{
  460. At: time.Now(),
  461. InBytesTotal: c.cr.Tot(),
  462. OutBytesTotal: c.cw.Tot(),
  463. }
  464. }
  465. func IsDeleted(bits uint32) bool {
  466. return bits&FlagDeleted != 0
  467. }
  468. func IsInvalid(bits uint32) bool {
  469. return bits&FlagInvalid != 0
  470. }
  471. func IsDirectory(bits uint32) bool {
  472. return bits&FlagDirectory != 0
  473. }
  474. func HasPermissionBits(bits uint32) bool {
  475. return bits&FlagNoPermBits == 0
  476. }