| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583 |
- // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
- // Use of this source code is governed by an MIT-style license that can be
- // found in the LICENSE file.
- package protocol
- import (
- "bufio"
- "compress/flate"
- "errors"
- "fmt"
- "io"
- "sync"
- "time"
- "github.com/calmh/syncthing/xdr"
- )
- const BlockSize = 128 * 1024
- const (
- messageTypeClusterConfig = 0
- messageTypeIndex = 1
- messageTypeRequest = 2
- messageTypeResponse = 3
- messageTypePing = 4
- messageTypePong = 5
- messageTypeIndexUpdate = 6
- )
- const (
- FlagDeleted uint32 = 1 << 12
- FlagInvalid = 1 << 13
- FlagDirectory = 1 << 14
- FlagNoPermBits = 1 << 15
- )
- const (
- FlagShareTrusted uint32 = 1 << 0
- FlagShareReadOnly = 1 << 1
- FlagShareBits = 0x000000ff
- )
- var (
- ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash")
- ErrClosed = errors.New("connection closed")
- )
- type Model interface {
- // An index was received from the peer node
- Index(nodeID string, repo string, files []FileInfo)
- // An index update was received from the peer node
- IndexUpdate(nodeID string, repo string, files []FileInfo)
- // A request was made by the peer node
- Request(nodeID string, repo string, name string, offset int64, size int) ([]byte, error)
- // A cluster configuration message was received
- ClusterConfig(nodeID string, config ClusterConfigMessage)
- // The peer node closed the connection
- Close(nodeID string, err error)
- }
- type Connection interface {
- ID() string
- Index(repo string, files []FileInfo)
- Request(repo string, name string, offset int64, size int) ([]byte, error)
- ClusterConfig(config ClusterConfigMessage)
- Statistics() Statistics
- }
- type rawConnection struct {
- id string
- receiver Model
- reader io.ReadCloser
- cr *countingReader
- xr *xdr.Reader
- writer io.WriteCloser
- cw *countingWriter
- wb *bufio.Writer
- xw *xdr.Writer
- wmut sync.Mutex
- indexSent map[string]map[string]uint64
- awaiting []chan asyncResult
- imut sync.Mutex
- nextID chan int
- outbox chan []encodable
- closed chan struct{}
- }
- type asyncResult struct {
- val []byte
- err error
- }
- const (
- pingTimeout = 30 * time.Second
- pingIdleTime = 60 * time.Second
- )
- func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) Connection {
- cr := &countingReader{Reader: reader}
- cw := &countingWriter{Writer: writer}
- flrd := flate.NewReader(cr)
- flwr, err := flate.NewWriter(cw, flate.BestSpeed)
- if err != nil {
- panic(err)
- }
- wb := bufio.NewWriter(flwr)
- c := rawConnection{
- id: nodeID,
- receiver: nativeModel{receiver},
- reader: flrd,
- cr: cr,
- xr: xdr.NewReader(flrd),
- writer: flwr,
- cw: cw,
- wb: wb,
- xw: xdr.NewWriter(wb),
- awaiting: make([]chan asyncResult, 0x1000),
- indexSent: make(map[string]map[string]uint64),
- outbox: make(chan []encodable),
- nextID: make(chan int),
- closed: make(chan struct{}),
- }
- go c.indexSerializerLoop()
- go c.readerLoop()
- go c.writerLoop()
- go c.pingerLoop()
- go c.idGenerator()
- return wireFormatConnection{&c}
- }
- func (c *rawConnection) ID() string {
- return c.id
- }
- // Index writes the list of file information to the connected peer node
- func (c *rawConnection) Index(repo string, idx []FileInfo) {
- c.imut.Lock()
- var msgType int
- if c.indexSent[repo] == nil {
- // This is the first time we send an index.
- msgType = messageTypeIndex
- c.indexSent[repo] = make(map[string]uint64)
- for _, f := range idx {
- c.indexSent[repo][f.Name] = f.Version
- }
- } else {
- // We have sent one full index. Only send updates now.
- msgType = messageTypeIndexUpdate
- var diff []FileInfo
- for _, f := range idx {
- if vs, ok := c.indexSent[repo][f.Name]; !ok || f.Version != vs {
- diff = append(diff, f)
- c.indexSent[repo][f.Name] = f.Version
- }
- }
- idx = diff
- }
- if len(idx) > 0 {
- c.send(header{0, -1, msgType}, IndexMessage{repo, idx})
- }
- c.imut.Unlock()
- }
- // Request returns the bytes for the specified block after fetching them from the connected peer.
- func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) {
- var id int
- select {
- case id = <-c.nextID:
- case <-c.closed:
- return nil, ErrClosed
- }
- c.imut.Lock()
- if ch := c.awaiting[id]; ch != nil {
- panic("id taken")
- }
- rc := make(chan asyncResult)
- c.awaiting[id] = rc
- c.imut.Unlock()
- ok := c.send(header{0, id, messageTypeRequest},
- RequestMessage{repo, name, uint64(offset), uint32(size)})
- if !ok {
- return nil, ErrClosed
- }
- res, ok := <-rc
- if !ok {
- return nil, ErrClosed
- }
- return res.val, res.err
- }
- // ClusterConfig send the cluster configuration message to the peer and returns any error
- func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
- c.send(header{0, -1, messageTypeClusterConfig}, config)
- }
- func (c *rawConnection) ping() bool {
- var id int
- select {
- case id = <-c.nextID:
- case <-c.closed:
- return false
- }
- rc := make(chan asyncResult, 1)
- c.imut.Lock()
- c.awaiting[id] = rc
- c.imut.Unlock()
- ok := c.send(header{0, id, messageTypePing})
- if !ok {
- return false
- }
- res, ok := <-rc
- return ok && res.err == nil
- }
- func (c *rawConnection) readerLoop() (err error) {
- defer func() {
- c.close(err)
- }()
- for {
- select {
- case <-c.closed:
- return ErrClosed
- default:
- }
- var hdr header
- hdr.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- return err
- }
- if hdr.version != 0 {
- return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version)
- }
- switch hdr.msgType {
- case messageTypeIndex:
- if err := c.handleIndex(); err != nil {
- return err
- }
- case messageTypeIndexUpdate:
- if err := c.handleIndexUpdate(); err != nil {
- return err
- }
- case messageTypeRequest:
- if err := c.handleRequest(hdr); err != nil {
- return err
- }
- case messageTypeResponse:
- if err := c.handleResponse(hdr); err != nil {
- return err
- }
- case messageTypePing:
- c.send(header{0, hdr.msgID, messageTypePong})
- case messageTypePong:
- c.handlePong(hdr)
- case messageTypeClusterConfig:
- if err := c.handleClusterConfig(); err != nil {
- return err
- }
- default:
- return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
- }
- }
- }
- type incomingIndex struct {
- update bool
- id string
- repo string
- files []FileInfo
- }
- var incomingIndexes = make(chan incomingIndex, 100) // should be enough for anyone, right?
- func (c *rawConnection) indexSerializerLoop() {
- // We must avoid blocking the reader loop when processing large indexes.
- // There is otherwise a potential deadlock where both sides has the model
- // locked because it's sending a large index update and can't receive the
- // large index update from the other side. But we must also ensure to
- // process the indexes in the order they are received, hence the separate
- // routine and buffered channel.
- for ii := range incomingIndexes {
- if ii.update {
- c.receiver.IndexUpdate(ii.id, ii.repo, ii.files)
- } else {
- c.receiver.Index(ii.id, ii.repo, ii.files)
- }
- }
- }
- func (c *rawConnection) handleIndex() error {
- var im IndexMessage
- im.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- return err
- } else {
- // We run this (and the corresponding one for update, below)
- // in a separate goroutine to avoid blocking the read loop.
- // There is otherwise a potential deadlock where both sides
- // has the model locked because it's sending a large index
- // update and can't receive the large index update from the
- // other side.
- incomingIndexes <- incomingIndex{false, c.id, im.Repository, im.Files}
- }
- return nil
- }
- func (c *rawConnection) handleIndexUpdate() error {
- var im IndexMessage
- im.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- return err
- } else {
- incomingIndexes <- incomingIndex{true, c.id, im.Repository, im.Files}
- }
- return nil
- }
- func (c *rawConnection) handleRequest(hdr header) error {
- var req RequestMessage
- req.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- return err
- }
- go c.processRequest(hdr.msgID, req)
- return nil
- }
- func (c *rawConnection) handleResponse(hdr header) error {
- data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size
- if err := c.xr.Error(); err != nil {
- return err
- }
- go func(hdr header, err error) {
- c.imut.Lock()
- rc := c.awaiting[hdr.msgID]
- c.awaiting[hdr.msgID] = nil
- c.imut.Unlock()
- if rc != nil {
- rc <- asyncResult{data, err}
- close(rc)
- }
- }(hdr, c.xr.Error())
- return nil
- }
- func (c *rawConnection) handlePong(hdr header) {
- c.imut.Lock()
- if rc := c.awaiting[hdr.msgID]; rc != nil {
- go func() {
- rc <- asyncResult{}
- close(rc)
- }()
- c.awaiting[hdr.msgID] = nil
- }
- c.imut.Unlock()
- }
- func (c *rawConnection) handleClusterConfig() error {
- var cm ClusterConfigMessage
- cm.decodeXDR(c.xr)
- if err := c.xr.Error(); err != nil {
- return err
- } else {
- go c.receiver.ClusterConfig(c.id, cm)
- }
- return nil
- }
- type encodable interface {
- encodeXDR(*xdr.Writer) (int, error)
- }
- type encodableBytes []byte
- func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) {
- return xw.WriteBytes(e)
- }
- func (c *rawConnection) send(h header, es ...encodable) bool {
- if h.msgID < 0 {
- select {
- case id := <-c.nextID:
- h.msgID = id
- case <-c.closed:
- return false
- }
- }
- msg := append([]encodable{h}, es...)
- select {
- case c.outbox <- msg:
- return true
- case <-c.closed:
- return false
- }
- }
- func (c *rawConnection) writerLoop() {
- var err error
- for es := range c.outbox {
- c.wmut.Lock()
- for _, e := range es {
- e.encodeXDR(c.xw)
- }
- if err = c.flush(); err != nil {
- c.wmut.Unlock()
- c.close(err)
- return
- }
- c.wmut.Unlock()
- }
- }
- type flusher interface {
- Flush() error
- }
- func (c *rawConnection) flush() error {
- if err := c.xw.Error(); err != nil {
- return err
- }
- if err := c.wb.Flush(); err != nil {
- return err
- }
- if f, ok := c.writer.(flusher); ok {
- return f.Flush()
- }
- return nil
- }
- func (c *rawConnection) close(err error) {
- c.imut.Lock()
- c.wmut.Lock()
- defer c.imut.Unlock()
- defer c.wmut.Unlock()
- select {
- case <-c.closed:
- return
- default:
- close(c.closed)
- for i, ch := range c.awaiting {
- if ch != nil {
- close(ch)
- c.awaiting[i] = nil
- }
- }
- c.writer.Close()
- c.reader.Close()
- go c.receiver.Close(c.id, err)
- }
- }
- func (c *rawConnection) idGenerator() {
- nextID := 0
- for {
- nextID = (nextID + 1) & 0xfff
- select {
- case c.nextID <- nextID:
- case <-c.closed:
- return
- }
- }
- }
- func (c *rawConnection) pingerLoop() {
- var rc = make(chan bool, 1)
- ticker := time.Tick(pingIdleTime / 2)
- for {
- select {
- case <-ticker:
- if d := time.Since(c.xr.LastRead()); d < pingIdleTime {
- if debug {
- l.Debugln(c.id, "ping skipped after rd", d)
- }
- continue
- }
- if d := time.Since(c.xw.LastWrite()); d < pingIdleTime {
- if debug {
- l.Debugln(c.id, "ping skipped after wr", d)
- }
- continue
- }
- go func() {
- if debug {
- l.Debugln(c.id, "ping ->")
- }
- rc <- c.ping()
- }()
- select {
- case ok := <-rc:
- if debug {
- l.Debugln(c.id, "<- pong")
- }
- if !ok {
- c.close(fmt.Errorf("ping failure"))
- }
- case <-time.After(pingTimeout):
- c.close(fmt.Errorf("ping timeout"))
- case <-c.closed:
- return
- }
- case <-c.closed:
- return
- }
- }
- }
- func (c *rawConnection) processRequest(msgID int, req RequestMessage) {
- data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size))
- c.send(header{0, msgID, messageTypeResponse},
- encodableBytes(data))
- }
- type Statistics struct {
- At time.Time
- InBytesTotal uint64
- OutBytesTotal uint64
- }
- func (c *rawConnection) Statistics() Statistics {
- return Statistics{
- At: time.Now(),
- InBytesTotal: c.cr.Tot(),
- OutBytesTotal: c.cw.Tot(),
- }
- }
- func IsDeleted(bits uint32) bool {
- return bits&FlagDeleted != 0
- }
- func IsInvalid(bits uint32) bool {
- return bits&FlagInvalid != 0
- }
- func IsDirectory(bits uint32) bool {
- return bits&FlagDirectory != 0
- }
- func HasPermissionBits(bits uint32) bool {
- return bits&FlagNoPermBits == 0
- }
|