protocol.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package protocol
  5. import (
  6. "bufio"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "sync"
  11. "time"
  12. "github.com/calmh/syncthing/xdr"
  13. )
  14. const BlockSize = 128 * 1024
  15. const (
  16. messageTypeClusterConfig = 0
  17. messageTypeIndex = 1
  18. messageTypeRequest = 2
  19. messageTypeResponse = 3
  20. messageTypePing = 4
  21. messageTypePong = 5
  22. messageTypeIndexUpdate = 6
  23. )
  24. const (
  25. stateInitial = iota
  26. stateCCRcvd
  27. stateIdxRcvd
  28. )
  29. const (
  30. FlagDeleted uint32 = 1 << 12
  31. FlagInvalid = 1 << 13
  32. FlagDirectory = 1 << 14
  33. FlagNoPermBits = 1 << 15
  34. )
  35. const (
  36. FlagShareTrusted uint32 = 1 << 0
  37. FlagShareReadOnly = 1 << 1
  38. FlagShareBits = 0x000000ff
  39. )
  40. var (
  41. ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
  42. ErrClosed = errors.New("connection closed")
  43. )
  44. type Model interface {
  45. // An index was received from the peer node
  46. Index(nodeID NodeID, repo string, files []FileInfo)
  47. // An index update was received from the peer node
  48. IndexUpdate(nodeID NodeID, repo string, files []FileInfo)
  49. // A request was made by the peer node
  50. Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error)
  51. // A cluster configuration message was received
  52. ClusterConfig(nodeID NodeID, config ClusterConfigMessage)
  53. // The peer node closed the connection
  54. Close(nodeID NodeID, err error)
  55. }
  56. type Connection interface {
  57. ID() NodeID
  58. Index(repo string, files []FileInfo)
  59. Request(repo string, name string, offset int64, size int) ([]byte, error)
  60. ClusterConfig(config ClusterConfigMessage)
  61. Statistics() Statistics
  62. }
  63. type rawConnection struct {
  64. id NodeID
  65. receiver Model
  66. state int
  67. cr *countingReader
  68. xr *xdr.Reader
  69. cw *countingWriter
  70. wb *bufio.Writer
  71. xw *xdr.Writer
  72. awaiting []chan asyncResult
  73. awaitingMut sync.Mutex
  74. idxSent map[string]map[string]uint64
  75. idxMut sync.Mutex // ensures serialization of Index calls
  76. nextID chan int
  77. outbox chan []encodable
  78. closed chan struct{}
  79. once sync.Once
  80. }
  81. type asyncResult struct {
  82. val []byte
  83. err error
  84. }
  85. const (
  86. pingTimeout = 30 * time.Second
  87. pingIdleTime = 60 * time.Second
  88. )
  89. func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model) Connection {
  90. cr := &countingReader{Reader: reader}
  91. cw := &countingWriter{Writer: writer}
  92. rb := bufio.NewReader(cr)
  93. wb := bufio.NewWriter(cw)
  94. c := rawConnection{
  95. id: nodeID,
  96. receiver: nativeModel{receiver},
  97. state: stateInitial,
  98. cr: cr,
  99. xr: xdr.NewReader(rb),
  100. cw: cw,
  101. wb: wb,
  102. xw: xdr.NewWriter(wb),
  103. awaiting: make([]chan asyncResult, 0x1000),
  104. idxSent: make(map[string]map[string]uint64),
  105. outbox: make(chan []encodable),
  106. nextID: make(chan int),
  107. closed: make(chan struct{}),
  108. }
  109. go c.indexSerializerLoop()
  110. go c.readerLoop()
  111. go c.writerLoop()
  112. go c.pingerLoop()
  113. go c.idGenerator()
  114. return wireFormatConnection{&c}
  115. }
  116. func (c *rawConnection) ID() NodeID {
  117. return c.id
  118. }
  119. // Index writes the list of file information to the connected peer node
  120. func (c *rawConnection) Index(repo string, idx []FileInfo) {
  121. c.idxMut.Lock()
  122. defer c.idxMut.Unlock()
  123. var msgType int
  124. if c.idxSent[repo] == nil {
  125. // This is the first time we send an index.
  126. msgType = messageTypeIndex
  127. c.idxSent[repo] = make(map[string]uint64)
  128. for _, f := range idx {
  129. c.idxSent[repo][f.Name] = f.Version
  130. }
  131. } else {
  132. // We have sent one full index. Only send updates now.
  133. msgType = messageTypeIndexUpdate
  134. var diff []FileInfo
  135. for _, f := range idx {
  136. if vs, ok := c.idxSent[repo][f.Name]; !ok || f.Version != vs {
  137. diff = append(diff, f)
  138. c.idxSent[repo][f.Name] = f.Version
  139. }
  140. }
  141. idx = diff
  142. }
  143. if msgType == messageTypeIndex || len(idx) > 0 {
  144. c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
  145. }
  146. }
  147. // Request returns the bytes for the specified block after fetching them from the connected peer.
  148. func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
  149. var id int
  150. select {
  151. case id = <-c.nextID:
  152. case <-c.closed:
  153. return nil, ErrClosed
  154. }
  155. c.awaitingMut.Lock()
  156. if ch := c.awaiting[id]; ch != nil {
  157. panic("id taken")
  158. }
  159. rc := make(chan asyncResult, 1)
  160. c.awaiting[id] = rc
  161. c.awaitingMut.Unlock()
  162. ok := c.send(header{0, id, messageTypeRequest},
  163. RequestMessage{repo, name, uint64(offset), uint32(size)})
  164. if !ok {
  165. return nil, ErrClosed
  166. }
  167. res, ok := <-rc
  168. if !ok {
  169. return nil, ErrClosed
  170. }
  171. return res.val, res.err
  172. }
  173. // ClusterConfig send the cluster configuration message to the peer and returns any error
  174. func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
  175. c.send(header{0, -1, messageTypeClusterConfig}, config)
  176. }
  177. func (c *rawConnection) ping() bool {
  178. var id int
  179. select {
  180. case id = <-c.nextID:
  181. case <-c.closed:
  182. return false
  183. }
  184. rc := make(chan asyncResult, 1)
  185. c.awaitingMut.Lock()
  186. c.awaiting[id] = rc
  187. c.awaitingMut.Unlock()
  188. ok := c.send(header{0, id, messageTypePing})
  189. if !ok {
  190. return false
  191. }
  192. res, ok := <-rc
  193. return ok && res.err == nil
  194. }
  195. func (c *rawConnection) readerLoop() (err error) {
  196. defer func() {
  197. c.close(err)
  198. }()
  199. for {
  200. select {
  201. case <-c.closed:
  202. return ErrClosed
  203. default:
  204. }
  205. var hdr header
  206. hdr.decodeXDR(c.xr)
  207. if err := c.xr.Error(); err != nil {
  208. return err
  209. }
  210. if hdr.version != 0 {
  211. return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
  212. }
  213. switch hdr.msgType {
  214. case messageTypeIndex:
  215. if c.state < stateCCRcvd {
  216. return fmt.Errorf("protocol error: index message in state %d", c.state)
  217. }
  218. if err := c.handleIndex(); err != nil {
  219. return err
  220. }
  221. c.state = stateIdxRcvd
  222. case messageTypeIndexUpdate:
  223. if c.state < stateIdxRcvd {
  224. return fmt.Errorf("protocol error: index update message in state %d", c.state)
  225. }
  226. if err := c.handleIndexUpdate(); err != nil {
  227. return err
  228. }
  229. case messageTypeRequest:
  230. if c.state < stateIdxRcvd {
  231. return fmt.Errorf("protocol error: request message in state %d", c.state)
  232. }
  233. if err := c.handleRequest(hdr); err != nil {
  234. return err
  235. }
  236. case messageTypeResponse:
  237. if c.state < stateIdxRcvd {
  238. return fmt.Errorf("protocol error: response message in state %d", c.state)
  239. }
  240. if err := c.handleResponse(hdr); err != nil {
  241. return err
  242. }
  243. case messageTypePing:
  244. c.send(header{0, hdr.msgID, messageTypePong})
  245. case messageTypePong:
  246. c.handlePong(hdr)
  247. case messageTypeClusterConfig:
  248. if c.state != stateInitial {
  249. return fmt.Errorf("protocol error: cluster config message in state %d", c.state)
  250. }
  251. if err := c.handleClusterConfig(); err != nil {
  252. return err
  253. }
  254. c.state = stateCCRcvd
  255. default:
  256. return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
  257. }
  258. }
  259. }
  260. type incomingIndex struct {
  261. update bool
  262. id NodeID
  263. repo string
  264. files []FileInfo
  265. }
  266. var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right?
  267. func (c *rawConnection) indexSerializerLoop() {
  268. // We must avoid blocking the reader loop when processing large indexes.
  269. // There is otherwise a potential deadlock where both sides has the model
  270. // locked because it's sending a large index update and can't receive the
  271. // large index update from the other side. But we must also ensure to
  272. // process the indexes in the order they are received, hence the separate
  273. // routine and buffered channel.
  274. for {
  275. select {
  276. case ii := <-incomingIndexes:
  277. if ii.update {
  278. c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
  279. } else {
  280. c.receiver.Index(ii.id, ii.repo, ii.files)
  281. }
  282. case <-c.closed:
  283. return
  284. }
  285. }
  286. }
  287. func (c *rawConnection) handleIndex() error {
  288. var im IndexMessage
  289. im.decodeXDR(c.xr)
  290. if err := c.xr.Error(); err != nil {
  291. return err
  292. } else {
  293. // We run this (and the corresponding one for update, below)
  294. // in a separate goroutine to avoid blocking the read loop.
  295. // There is otherwise a potential deadlock where both sides
  296. // has the model locked because it's sending a large index
  297. // update and can't receive the large index update from the
  298. // other side.
  299. incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
  300. }
  301. return nil
  302. }
  303. func (c *rawConnection) handleIndexUpdate() error {
  304. var im IndexMessage
  305. im.decodeXDR(c.xr)
  306. if err := c.xr.Error(); err != nil {
  307. return err
  308. } else {
  309. incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
  310. }
  311. return nil
  312. }
  313. func (c *rawConnection) handleRequest(hdr header) error {
  314. var req RequestMessage
  315. req.decodeXDR(c.xr)
  316. if err := c.xr.Error(); err != nil {
  317. return err
  318. }
  319. go c.processRequest(hdr.msgID, req)
  320. return nil
  321. }
  322. func (c *rawConnection) handleResponse(hdr header) error {
  323. data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
  324. if err := c.xr.Error(); err != nil {
  325. return err
  326. }
  327. c.awaitingMut.Lock()
  328. if rc := c.awaiting[hdr.msgID]; rc != nil {
  329. c.awaiting[hdr.msgID] = nil
  330. rc <- asyncResult{data, nil}
  331. close(rc)
  332. }
  333. c.awaitingMut.Unlock()
  334. return nil
  335. }
  336. func (c *rawConnection) handlePong(hdr header) {
  337. c.awaitingMut.Lock()
  338. if rc := c.awaiting[hdr.msgID]; rc != nil {
  339. c.awaiting[hdr.msgID] = nil
  340. rc <- asyncResult{}
  341. close(rc)
  342. }
  343. c.awaitingMut.Unlock()
  344. }
  345. func (c *rawConnection) handleClusterConfig() error {
  346. var cm ClusterConfigMessage
  347. cm.decodeXDR(c.xr)
  348. if err := c.xr.Error(); err != nil {
  349. return err
  350. } else {
  351. go c.receiver.ClusterConfig(c.id, cm)
  352. }
  353. return nil
  354. }
  355. type encodable interface {
  356. encodeXDR(*xdr.Writer) (int, error)
  357. }
  358. type encodableBytes []byte
  359. func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
  360. return xw.WriteBytes(e)
  361. }
  362. func (c *rawConnection) send(h header, es ...encodable) bool {
  363. if h.msgID < 0 {
  364. select {
  365. case id := <-c.nextID:
  366. h.msgID = id
  367. case <-c.closed:
  368. return false
  369. }
  370. }
  371. msg := append([]encodable{h}, es...)
  372. select {
  373. case c.outbox <- msg:
  374. return true
  375. case <-c.closed:
  376. return false
  377. }
  378. }
  379. func (c *rawConnection) writerLoop() {
  380. var err error
  381. for {
  382. select {
  383. case es := <-c.outbox:
  384. for _, e := range es {
  385. e.encodeXDR(c.xw)
  386. }
  387. if err = c.flush(); err != nil {
  388. c.close(err)
  389. return
  390. }
  391. case <-c.closed:
  392. return
  393. }
  394. }
  395. }
  396. type flusher interface {
  397. Flush() error
  398. }
  399. func (c *rawConnection) flush() error {
  400. if err := c.xw.Error(); err != nil {
  401. return err
  402. }
  403. if err := c.wb.Flush(); err != nil {
  404. return err
  405. }
  406. return nil
  407. }
  408. func (c *rawConnection) close(err error) {
  409. c.once.Do(func() {
  410. close(c.closed)
  411. c.awaitingMut.Lock()
  412. for i, ch := range c.awaiting {
  413. if ch != nil {
  414. close(ch)
  415. c.awaiting[i] = nil
  416. }
  417. }
  418. c.awaitingMut.Unlock()
  419. go c.receiver.Close(c.id, err)
  420. })
  421. }
  422. func (c *rawConnection) idGenerator() {
  423. nextID := 0
  424. for {
  425. nextID = (nextID + 1) & 0xfff
  426. select {
  427. case c.nextID <- nextID:
  428. case <-c.closed:
  429. return
  430. }
  431. }
  432. }
  433. func (c *rawConnection) pingerLoop() {
  434. var rc = make(chan bool, 1)
  435. ticker := time.Tick(pingIdleTime / 2)
  436. for {
  437. select {
  438. case <-ticker:
  439. if d := time.Since(c.xr.LastRead()); d < pingIdleTime {
  440. if debug {
  441. l.Debugln(c.id, "ping skipped after rd", d)
  442. }
  443. continue
  444. }
  445. if d := time.Since(c.xw.LastWrite()); d < pingIdleTime {
  446. if debug {
  447. l.Debugln(c.id, "ping skipped after wr", d)
  448. }
  449. continue
  450. }
  451. go func() {
  452. if debug {
  453. l.Debugln(c.id, "ping ->")
  454. }
  455. rc <- c.ping()
  456. }()
  457. select {
  458. case ok := <-rc:
  459. if debug {
  460. l.Debugln(c.id, "<- pong")
  461. }
  462. if !ok {
  463. c.close(fmt.Errorf("ping failure"))
  464. }
  465. case <-time.After(pingTimeout):
  466. c.close(fmt.Errorf("ping timeout"))
  467. case <-c.closed:
  468. return
  469. }
  470. case <-c.closed:
  471. return
  472. }
  473. }
  474. }
  475. func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
  476. data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
  477. c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data))
  478. }
  479. type Statistics struct {
  480. At time.Time
  481. InBytesTotal uint64
  482. OutBytesTotal uint64
  483. }
  484. func (c *rawConnection) Statistics() Statistics {
  485. return Statistics{
  486. At: time.Now(),
  487. InBytesTotal: c.cr.Tot(),
  488. OutBytesTotal: c.cw.Tot(),
  489. }
  490. }
  491. func IsDeleted(bits uint32) bool {
  492. return bits&FlagDeleted != 0
  493. }
  494. func IsInvalid(bits uint32) bool {
  495. return bits&FlagInvalid != 0
  496. }
  497. func IsDirectory(bits uint32) bool {
  498. return bits&FlagDirectory != 0
  499. }
  500. func HasPermissionBits(bits uint32) bool {
  501. return bits&FlagNoPermBits == 0
  502. }