| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149 | 
							- // Copyright (C) 2014 The Syncthing Authors.
 
- //
 
- // This Source Code Form is subject to the terms of the Mozilla Public
 
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
 
- // You can obtain one at https://mozilla.org/MPL/2.0/.
 
- // Prevents import loop, for internal testing
 
- //go:generate go tool counterfeiter -o mocked_connection_info_test.go --fake-name mockedConnectionInfo . ConnectionInfo
 
- //go:generate go run ../../script/prune_mocks.go -t mocked_connection_info_test.go
 
- //go:generate go tool counterfeiter -o mocks/connection_info.go --fake-name ConnectionInfo . ConnectionInfo
 
- //go:generate go tool counterfeiter -o mocks/connection.go --fake-name Connection . Connection
 
- package protocol
 
- import (
 
- 	"context"
 
- 	"encoding/binary"
 
- 	"errors"
 
- 	"fmt"
 
- 	"io"
 
- 	"net"
 
- 	"path"
 
- 	"strings"
 
- 	"sync"
 
- 	"time"
 
- 	lz4 "github.com/pierrec/lz4/v4"
 
- 	"google.golang.org/protobuf/proto"
 
- 	"github.com/syncthing/syncthing/internal/gen/bep"
 
- 	"github.com/syncthing/syncthing/internal/protoutil"
 
- )
 
- const (
 
- 	// Shifts
 
- 	KiB = 10
 
- 	MiB = 20
 
- 	GiB = 30
 
- )
 
- const (
 
- 	// MaxMessageLen is the largest message size allowed on the wire. (500 MB)
 
- 	MaxMessageLen = 500 * 1000 * 1000
 
- 	// MinBlockSize is the minimum block size allowed
 
- 	MinBlockSize = 128 << KiB
 
- 	// MaxBlockSize is the maximum block size allowed
 
- 	MaxBlockSize = 16 << MiB
 
- 	// DesiredPerFileBlocks is the number of blocks we aim for per file
 
- 	DesiredPerFileBlocks = 2000
 
- 	SyntheticDirectorySize = 128
 
- 	// don't bother compressing messages smaller than this many bytes
 
- 	compressionThreshold = 128
 
- )
 
- var errNotCompressible = errors.New("not compressible")
 
- const (
 
- 	stateInitial = iota
 
- 	stateReady
 
- )
 
- var (
 
- 	ErrClosed             = errors.New("connection closed")
 
- 	ErrTimeout            = errors.New("read timeout")
 
- 	errUnknownMessage     = errors.New("unknown message")
 
- 	errInvalidFilename    = errors.New("filename is invalid")
 
- 	errUncleanFilename    = errors.New("filename not in canonical format")
 
- 	errDeletedHasBlocks   = errors.New("deleted file with non-empty block list")
 
- 	errDirectoryHasBlocks = errors.New("directory with non-empty block list")
 
- 	errFileHasNoBlocks    = errors.New("file with empty block list")
 
- )
 
- type Model interface {
 
- 	// An index was received from the peer device
 
- 	Index(conn Connection, idx *Index) error
 
- 	// An index update was received from the peer device
 
- 	IndexUpdate(conn Connection, idxUp *IndexUpdate) error
 
- 	// A request was made by the peer device
 
- 	Request(conn Connection, req *Request) (RequestResponse, error)
 
- 	// A cluster configuration message was received
 
- 	ClusterConfig(conn Connection, config *ClusterConfig) error
 
- 	// The peer device closed the connection or an error occurred
 
- 	Closed(conn Connection, err error)
 
- 	// The peer device sent progress updates for the files it is currently downloading
 
- 	DownloadProgress(conn Connection, p *DownloadProgress) error
 
- }
 
- // rawModel is the Model interface, but without the initial Connection
 
- // parameter. Internal use only.
 
- type rawModel interface {
 
- 	Index(*Index) error
 
- 	IndexUpdate(*IndexUpdate) error
 
- 	Request(*Request) (RequestResponse, error)
 
- 	ClusterConfig(*ClusterConfig) error
 
- 	Closed(err error)
 
- 	DownloadProgress(*DownloadProgress) error
 
- }
 
- type RequestResponse interface {
 
- 	Data() []byte
 
- 	Close() // Must always be called once the byte slice is no longer in use
 
- 	Wait()  // Blocks until Close is called
 
- }
 
- type Connection interface {
 
- 	// Send an Index message to the peer device. The message in the
 
- 	// parameter may be altered by the connection and should not be used
 
- 	// further by the caller.
 
- 	Index(ctx context.Context, idx *Index) error
 
- 	// Send an Index Update message to the peer device. The message in the
 
- 	// parameter may be altered by the connection and should not be used
 
- 	// further by the caller.
 
- 	IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error
 
- 	// Send a Request message to the peer device. The message in the
 
- 	// parameter may be altered by the connection and should not be used
 
- 	// further by the caller.
 
- 	Request(ctx context.Context, req *Request) ([]byte, error)
 
- 	// Send a Cluster Configuration message to the peer device. The message
 
- 	// in the parameter may be altered by the connection and should not be
 
- 	// used further by the caller.
 
- 	// For any folder that must be encrypted for the connected device, the
 
- 	// password must be provided.
 
- 	ClusterConfig(config *ClusterConfig, passwords map[string]string)
 
- 	// Send a Download Progress message to the peer device. The message in
 
- 	// the parameter may be altered by the connection and should not be used
 
- 	// further by the caller.
 
- 	DownloadProgress(ctx context.Context, dp *DownloadProgress)
 
- 	Start()
 
- 	Close(err error)
 
- 	DeviceID() DeviceID
 
- 	Statistics() Statistics
 
- 	Closed() <-chan struct{}
 
- 	ConnectionInfo
 
- }
 
- type ConnectionInfo interface {
 
- 	Type() string
 
- 	Transport() string
 
- 	IsLocal() bool
 
- 	RemoteAddr() net.Addr
 
- 	Priority() int
 
- 	String() string
 
- 	Crypto() string
 
- 	EstablishedAt() time.Time
 
- 	ConnectionID() string
 
- }
 
- type rawConnection struct {
 
- 	ConnectionInfo
 
- 	deviceID  DeviceID
 
- 	idString  string
 
- 	model     rawModel
 
- 	startTime time.Time
 
- 	started   chan struct{}
 
- 	cr     *countingReader
 
- 	cw     *countingWriter
 
- 	closer io.Closer // Closing the underlying connection and thus cr and cw
 
- 	awaitingMut sync.Mutex // Protects awaiting and nextID.
 
- 	awaiting    map[int]chan asyncResult
 
- 	nextID      int
 
- 	idxMut sync.Mutex // ensures serialization of Index calls
 
- 	inbox                 chan proto.Message
 
- 	outbox                chan asyncMessage
 
- 	closeBox              chan asyncMessage
 
- 	clusterConfigBox      chan *ClusterConfig
 
- 	dispatcherLoopStopped chan struct{}
 
- 	closed                chan struct{}
 
- 	closeOnce             sync.Once
 
- 	sendCloseOnce         sync.Once
 
- 	compression           Compression
 
- 	startStopMut          sync.Mutex // start and stop must be serialized
 
- 	loopWG sync.WaitGroup // Need to ensure no leftover routines in testing
 
- }
 
- type asyncResult struct {
 
- 	val []byte
 
- 	err error
 
- }
 
- type asyncMessage struct {
 
- 	msg  proto.Message
 
- 	done chan struct{} // done closes when we're done sending the message
 
- }
 
- const (
 
- 	// PingSendInterval is how often we make sure to send a message, by
 
- 	// triggering pings if necessary.
 
- 	PingSendInterval = 90 * time.Second
 
- 	// ReceiveTimeout is the longest we'll wait for a message from the other
 
- 	// side before closing the connection.
 
- 	ReceiveTimeout = 300 * time.Second
 
- )
 
- // CloseTimeout is the longest we'll wait when trying to send the close
 
- // message before just closing the connection.
 
- // Should not be modified in production code, just for testing.
 
- var CloseTimeout = 10 * time.Second
 
- func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, model Model, connInfo ConnectionInfo, compress Compression, keyGen *KeyGenerator) Connection {
 
- 	// We create the wrapper for the model first, as it needs to be passed
 
- 	// in at the lowest level in the stack. At the end of construction,
 
- 	// before returning, we add the connection to cwm so that it can be used
 
- 	// by the model.
 
- 	cwm := &connectionWrappingModel{model: model}
 
- 	// Encryption / decryption is first (outermost) before conversion to
 
- 	// native path formats.
 
- 	nm := makeNative(cwm)
 
- 	em := newEncryptedModel(nm, keyGen)
 
- 	// We do the wire format conversion first (outermost) so that the
 
- 	// metadata is in wire format when it reaches the encryption step.
 
- 	rc := newRawConnection(deviceID, reader, writer, closer, em, connInfo, compress)
 
- 	ec := newEncryptedConnection(rc, rc, em.folderKeys, keyGen)
 
- 	wc := wireFormatConnection{ec}
 
- 	cwm.conn = wc
 
- 	return wc
 
- }
 
- func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver rawModel, connInfo ConnectionInfo, compress Compression) *rawConnection {
 
- 	idString := deviceID.String()
 
- 	cr := &countingReader{Reader: reader, idString: idString}
 
- 	cw := &countingWriter{Writer: writer, idString: idString}
 
- 	registerDeviceMetrics(idString)
 
- 	return &rawConnection{
 
- 		ConnectionInfo:        connInfo,
 
- 		deviceID:              deviceID,
 
- 		idString:              deviceID.String(),
 
- 		model:                 receiver,
 
- 		started:               make(chan struct{}),
 
- 		cr:                    cr,
 
- 		cw:                    cw,
 
- 		closer:                closer,
 
- 		awaiting:              make(map[int]chan asyncResult),
 
- 		inbox:                 make(chan proto.Message),
 
- 		outbox:                make(chan asyncMessage),
 
- 		closeBox:              make(chan asyncMessage),
 
- 		clusterConfigBox:      make(chan *ClusterConfig),
 
- 		dispatcherLoopStopped: make(chan struct{}),
 
- 		closed:                make(chan struct{}),
 
- 		compression:           compress,
 
- 		loopWG:                sync.WaitGroup{},
 
- 	}
 
- }
 
- // Start creates the goroutines for sending and receiving of messages. It must
 
- // be called once after creating a connection. It should only be called once,
 
- // subsequent calls will have no effect.
 
- func (c *rawConnection) Start() {
 
- 	c.startStopMut.Lock()
 
- 	defer c.startStopMut.Unlock()
 
- 	select {
 
- 	case <-c.started:
 
- 		return
 
- 	case <-c.closed:
 
- 		// we have already closed the connection before starting processing
 
- 		// on it.
 
- 		return
 
- 	default:
 
- 	}
 
- 	c.loopWG.Add(5)
 
- 	go func() {
 
- 		c.readerLoop()
 
- 		c.loopWG.Done()
 
- 	}()
 
- 	go func() {
 
- 		err := c.dispatcherLoop()
 
- 		c.Close(err)
 
- 		c.loopWG.Done()
 
- 	}()
 
- 	go func() {
 
- 		c.writerLoop()
 
- 		c.loopWG.Done()
 
- 	}()
 
- 	go func() {
 
- 		c.pingSender()
 
- 		c.loopWG.Done()
 
- 	}()
 
- 	go func() {
 
- 		c.pingReceiver()
 
- 		c.loopWG.Done()
 
- 	}()
 
- 	c.startTime = time.Now().Truncate(time.Second)
 
- 	close(c.started)
 
- }
 
- func (c *rawConnection) DeviceID() DeviceID {
 
- 	return c.deviceID
 
- }
 
- // Index writes the list of file information to the connected peer device
 
- func (c *rawConnection) Index(ctx context.Context, idx *Index) error {
 
- 	select {
 
- 	case <-c.closed:
 
- 		return ErrClosed
 
- 	case <-ctx.Done():
 
- 		return ctx.Err()
 
- 	default:
 
- 	}
 
- 	c.idxMut.Lock()
 
- 	c.send(ctx, idx.toWire(), nil)
 
- 	c.idxMut.Unlock()
 
- 	return nil
 
- }
 
- // IndexUpdate writes the list of file information to the connected peer device as an update
 
- func (c *rawConnection) IndexUpdate(ctx context.Context, idxUp *IndexUpdate) error {
 
- 	select {
 
- 	case <-c.closed:
 
- 		return ErrClosed
 
- 	case <-ctx.Done():
 
- 		return ctx.Err()
 
- 	default:
 
- 	}
 
- 	c.idxMut.Lock()
 
- 	c.send(ctx, idxUp.toWire(), nil)
 
- 	c.idxMut.Unlock()
 
- 	return nil
 
- }
 
- // Request returns the bytes for the specified block after fetching them from the connected peer.
 
- func (c *rawConnection) Request(ctx context.Context, req *Request) ([]byte, error) {
 
- 	select {
 
- 	case <-c.closed:
 
- 		return nil, ErrClosed
 
- 	case <-ctx.Done():
 
- 		return nil, ctx.Err()
 
- 	default:
 
- 	}
 
- 	rc := make(chan asyncResult, 1)
 
- 	c.awaitingMut.Lock()
 
- 	id := c.nextID
 
- 	c.nextID++
 
- 	if _, ok := c.awaiting[id]; ok {
 
- 		c.awaitingMut.Unlock()
 
- 		panic("id taken")
 
- 	}
 
- 	c.awaiting[id] = rc
 
- 	c.awaitingMut.Unlock()
 
- 	req.ID = id
 
- 	ok := c.send(ctx, req.toWire(), nil)
 
- 	if !ok {
 
- 		return nil, ErrClosed
 
- 	}
 
- 	select {
 
- 	case res, ok := <-rc:
 
- 		if !ok {
 
- 			return nil, ErrClosed
 
- 		}
 
- 		return res.val, res.err
 
- 	case <-ctx.Done():
 
- 		return nil, ctx.Err()
 
- 	}
 
- }
 
- // ClusterConfig sends the cluster configuration message to the peer.
 
- func (c *rawConnection) ClusterConfig(config *ClusterConfig, _ map[string]string) {
 
- 	select {
 
- 	case c.clusterConfigBox <- config:
 
- 	case <-c.closed:
 
- 	}
 
- }
 
- func (c *rawConnection) Closed() <-chan struct{} {
 
- 	return c.closed
 
- }
 
- // DownloadProgress sends the progress updates for the files that are currently being downloaded.
 
- func (c *rawConnection) DownloadProgress(ctx context.Context, dp *DownloadProgress) {
 
- 	c.send(ctx, dp.toWire(), nil)
 
- }
 
- func (c *rawConnection) ping() bool {
 
- 	return c.send(context.Background(), &bep.Ping{}, nil)
 
- }
 
- func (c *rawConnection) readerLoop() {
 
- 	fourByteBuf := make([]byte, 4)
 
- 	for {
 
- 		msg, err := c.readMessage(fourByteBuf)
 
- 		if err != nil {
 
- 			if err == errUnknownMessage {
 
- 				// Unknown message types are skipped, for future extensibility.
 
- 				continue
 
- 			}
 
- 			c.internalClose(err)
 
- 			return
 
- 		}
 
- 		select {
 
- 		case c.inbox <- msg:
 
- 		case <-c.closed:
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- func (c *rawConnection) dispatcherLoop() (err error) {
 
- 	defer close(c.dispatcherLoopStopped)
 
- 	var msg proto.Message
 
- 	state := stateInitial
 
- 	for {
 
- 		select {
 
- 		case <-c.closed:
 
- 			return ErrClosed
 
- 		default:
 
- 		}
 
- 		select {
 
- 		case msg = <-c.inbox:
 
- 		case <-c.closed:
 
- 			return ErrClosed
 
- 		}
 
- 		metricDeviceRecvMessages.WithLabelValues(c.idString).Inc()
 
- 		msgContext, err := messageContext(msg)
 
- 		if err != nil {
 
- 			return fmt.Errorf("protocol error: %w", err)
 
- 		}
 
- 		l.Debugf("handle %v message", msgContext)
 
- 		switch msg := msg.(type) {
 
- 		case *bep.ClusterConfig:
 
- 			if state == stateInitial {
 
- 				state = stateReady
 
- 			}
 
- 		case *bep.Close:
 
- 			return fmt.Errorf("closed by remote: %v", msg.Reason)
 
- 		default:
 
- 			if state != stateReady {
 
- 				return newProtocolError(fmt.Errorf("invalid state %d", state), msgContext)
 
- 			}
 
- 		}
 
- 		switch msg := msg.(type) {
 
- 		case *bep.Request:
 
- 			err = checkFilename(msg.Name)
 
- 		}
 
- 		if err != nil {
 
- 			return newProtocolError(err, msgContext)
 
- 		}
 
- 		switch msg := msg.(type) {
 
- 		case *bep.ClusterConfig:
 
- 			err = c.model.ClusterConfig(clusterConfigFromWire(msg))
 
- 		case *bep.Index:
 
- 			idx := indexFromWire(msg)
 
- 			if err := checkIndexConsistency(idx.Files); err != nil {
 
- 				return newProtocolError(err, msgContext)
 
- 			}
 
- 			err = c.handleIndex(idx)
 
- 		case *bep.IndexUpdate:
 
- 			idxUp := indexUpdateFromWire(msg)
 
- 			if err := checkIndexConsistency(idxUp.Files); err != nil {
 
- 				return newProtocolError(err, msgContext)
 
- 			}
 
- 			err = c.handleIndexUpdate(idxUp)
 
- 		case *bep.Request:
 
- 			go c.handleRequest(requestFromWire(msg))
 
- 		case *bep.Response:
 
- 			c.handleResponse(responseFromWire(msg))
 
- 		case *bep.DownloadProgress:
 
- 			err = c.model.DownloadProgress(downloadProgressFromWire(msg))
 
- 		}
 
- 		if err != nil {
 
- 			return newHandleError(err, msgContext)
 
- 		}
 
- 	}
 
- }
 
- func (c *rawConnection) readMessage(fourByteBuf []byte) (proto.Message, error) {
 
- 	hdr, err := c.readHeader(fourByteBuf)
 
- 	if err != nil {
 
- 		return nil, err
 
- 	}
 
- 	return c.readMessageAfterHeader(hdr, fourByteBuf)
 
- }
 
- func (c *rawConnection) readMessageAfterHeader(hdr *bep.Header, fourByteBuf []byte) (proto.Message, error) {
 
- 	// First comes a 4 byte message length
 
- 	if _, err := io.ReadFull(c.cr, fourByteBuf[:4]); err != nil {
 
- 		return nil, fmt.Errorf("reading message length: %w", err)
 
- 	}
 
- 	msgLen := int32(binary.BigEndian.Uint32(fourByteBuf))
 
- 	if msgLen < 0 {
 
- 		return nil, fmt.Errorf("negative message length %d", msgLen)
 
- 	} else if msgLen > MaxMessageLen {
 
- 		return nil, fmt.Errorf("message length %d exceeds maximum %d", msgLen, MaxMessageLen)
 
- 	}
 
- 	// Then comes the message
 
- 	buf := BufferPool.Get(int(msgLen))
 
- 	defer BufferPool.Put(buf)
 
- 	if _, err := io.ReadFull(c.cr, buf); err != nil {
 
- 		return nil, fmt.Errorf("reading message: %w", err)
 
- 	}
 
- 	// ... which might be compressed
 
- 	switch hdr.Compression {
 
- 	case bep.MessageCompression_MESSAGE_COMPRESSION_NONE:
 
- 		// Nothing
 
- 	case bep.MessageCompression_MESSAGE_COMPRESSION_LZ4:
 
- 		decomp, err := lz4Decompress(buf)
 
- 		if err != nil {
 
- 			return nil, fmt.Errorf("decompressing message: %w", err)
 
- 		}
 
- 		buf = decomp
 
- 	default:
 
- 		return nil, fmt.Errorf("unknown message compression %d", hdr.Compression)
 
- 	}
 
- 	// ... and is then unmarshalled
 
- 	metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(4 + len(buf)))
 
- 	msg, err := newMessage(hdr.Type)
 
- 	if err != nil {
 
- 		return nil, err
 
- 	}
 
- 	if err := proto.Unmarshal(buf, msg); err != nil {
 
- 		return nil, fmt.Errorf("unmarshalling message: %w", err)
 
- 	}
 
- 	return msg, nil
 
- }
 
- func (c *rawConnection) readHeader(fourByteBuf []byte) (*bep.Header, error) {
 
- 	// First comes a 2 byte header length
 
- 	if _, err := io.ReadFull(c.cr, fourByteBuf[:2]); err != nil {
 
- 		return nil, fmt.Errorf("reading length: %w", err)
 
- 	}
 
- 	hdrLen := int16(binary.BigEndian.Uint16(fourByteBuf))
 
- 	if hdrLen < 0 {
 
- 		return nil, fmt.Errorf("negative header length %d", hdrLen)
 
- 	}
 
- 	// Then comes the header
 
- 	buf := BufferPool.Get(int(hdrLen))
 
- 	defer BufferPool.Put(buf)
 
- 	if _, err := io.ReadFull(c.cr, buf); err != nil {
 
- 		return nil, fmt.Errorf("reading header: %w", err)
 
- 	}
 
- 	var hdr bep.Header
 
- 	err := proto.Unmarshal(buf, &hdr)
 
- 	if err != nil {
 
- 		return nil, fmt.Errorf("unmarshalling header: %w", err)
 
- 	}
 
- 	metricDeviceRecvDecompressedBytes.WithLabelValues(c.idString).Add(float64(2 + len(buf)))
 
- 	return &hdr, nil
 
- }
 
- func (c *rawConnection) handleIndex(im *Index) error {
 
- 	l.Debugf("Index(%v, %v, %d file)", c.deviceID, im.Folder, len(im.Files))
 
- 	return c.model.Index(im)
 
- }
 
- func (c *rawConnection) handleIndexUpdate(im *IndexUpdate) error {
 
- 	l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.deviceID, im.Folder, len(im.Files))
 
- 	return c.model.IndexUpdate(im)
 
- }
 
- // checkIndexConsistency verifies a number of invariants on FileInfos received in
 
- // index messages.
 
- func checkIndexConsistency(fs []FileInfo) error {
 
- 	for _, f := range fs {
 
- 		if err := checkFileInfoConsistency(f); err != nil {
 
- 			return fmt.Errorf("%q: %w", f.Name, err)
 
- 		}
 
- 	}
 
- 	return nil
 
- }
 
- // checkFileInfoConsistency verifies a number of invariants on the given FileInfo
 
- func checkFileInfoConsistency(f FileInfo) error {
 
- 	if err := checkFilename(f.Name); err != nil {
 
- 		return err
 
- 	}
 
- 	switch {
 
- 	case f.Deleted && len(f.Blocks) != 0:
 
- 		// Deleted files should have no blocks
 
- 		return errDeletedHasBlocks
 
- 	case f.Type == FileInfoTypeDirectory && len(f.Blocks) != 0:
 
- 		// Directories should have no blocks
 
- 		return errDirectoryHasBlocks
 
- 	case !f.Deleted && !f.IsInvalid() && f.Type == FileInfoTypeFile && len(f.Blocks) == 0:
 
- 		// Non-deleted, non-invalid files should have at least one block
 
- 		return errFileHasNoBlocks
 
- 	}
 
- 	return nil
 
- }
 
- // checkFilename verifies that the given filename is valid according to the
 
- // spec on what's allowed over the wire. A filename failing this test is
 
- // grounds for disconnecting the device.
 
- func checkFilename(name string) error {
 
- 	cleanedName := path.Clean(name)
 
- 	if cleanedName != name {
 
- 		// The filename on the wire should be in canonical format. If
 
- 		// Clean() managed to clean it up, there was something wrong with
 
- 		// it.
 
- 		return errUncleanFilename
 
- 	}
 
- 	switch name {
 
- 	case "", ".", "..":
 
- 		// These names are always invalid.
 
- 		return errInvalidFilename
 
- 	}
 
- 	if strings.HasPrefix(name, "/") {
 
- 		// Names are folder relative, not absolute.
 
- 		return errInvalidFilename
 
- 	}
 
- 	if strings.HasPrefix(name, "../") {
 
- 		// Starting with a dotdot is not allowed. Any other dotdots would
 
- 		// have been handled by the Clean() call at the top.
 
- 		return errInvalidFilename
 
- 	}
 
- 	return nil
 
- }
 
- func (c *rawConnection) handleRequest(req *Request) {
 
- 	res, err := c.model.Request(req)
 
- 	if err != nil {
 
- 		resp := &Response{
 
- 			ID:   req.ID,
 
- 			Code: errorToCode(err),
 
- 		}
 
- 		c.send(context.Background(), resp.toWire(), nil)
 
- 		return
 
- 	}
 
- 	done := make(chan struct{})
 
- 	resp := &Response{
 
- 		ID:   req.ID,
 
- 		Data: res.Data(),
 
- 		Code: errorToCode(nil),
 
- 	}
 
- 	c.send(context.Background(), resp.toWire(), done)
 
- 	<-done
 
- 	res.Close()
 
- }
 
- func (c *rawConnection) handleResponse(resp *Response) {
 
- 	c.awaitingMut.Lock()
 
- 	if rc := c.awaiting[resp.ID]; rc != nil {
 
- 		delete(c.awaiting, resp.ID)
 
- 		rc <- asyncResult{resp.Data, codeToError(resp.Code)}
 
- 		close(rc)
 
- 	}
 
- 	c.awaitingMut.Unlock()
 
- }
 
- func (c *rawConnection) send(ctx context.Context, msg proto.Message, done chan struct{}) bool {
 
- 	select {
 
- 	case c.outbox <- asyncMessage{msg, done}:
 
- 		return true
 
- 	case <-c.closed:
 
- 	case <-ctx.Done():
 
- 	}
 
- 	if done != nil {
 
- 		close(done)
 
- 	}
 
- 	return false
 
- }
 
- func (c *rawConnection) writerLoop() {
 
- 	select {
 
- 	case cc := <-c.clusterConfigBox:
 
- 		err := c.writeMessage(cc.toWire())
 
- 		if err != nil {
 
- 			c.internalClose(err)
 
- 			return
 
- 		}
 
- 	case hm := <-c.closeBox:
 
- 		_ = c.writeMessage(hm.msg)
 
- 		close(hm.done)
 
- 		return
 
- 	case <-c.closed:
 
- 		return
 
- 	}
 
- 	for {
 
- 		// When the connection is closing or closed, that should happen
 
- 		// immediately, not compete with the (potentially very busy) outbox.
 
- 		select {
 
- 		case hm := <-c.closeBox:
 
- 			_ = c.writeMessage(hm.msg)
 
- 			close(hm.done)
 
- 			return
 
- 		case <-c.closed:
 
- 			return
 
- 		default:
 
- 		}
 
- 		select {
 
- 		case cc := <-c.clusterConfigBox:
 
- 			err := c.writeMessage(cc.toWire())
 
- 			if err != nil {
 
- 				c.internalClose(err)
 
- 				return
 
- 			}
 
- 		case hm := <-c.outbox:
 
- 			err := c.writeMessage(hm.msg)
 
- 			if hm.done != nil {
 
- 				close(hm.done)
 
- 			}
 
- 			if err != nil {
 
- 				c.internalClose(err)
 
- 				return
 
- 			}
 
- 		case hm := <-c.closeBox:
 
- 			_ = c.writeMessage(hm.msg)
 
- 			close(hm.done)
 
- 			return
 
- 		case <-c.closed:
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- func (c *rawConnection) writeMessage(msg proto.Message) error {
 
- 	msgContext, _ := messageContext(msg)
 
- 	l.Debugf("Writing %v", msgContext)
 
- 	defer func() {
 
- 		metricDeviceSentMessages.WithLabelValues(c.idString).Inc()
 
- 	}()
 
- 	size := proto.Size(msg)
 
- 	hdr := &bep.Header{
 
- 		Type: typeOf(msg),
 
- 	}
 
- 	hdrSize := proto.Size(hdr)
 
- 	if hdrSize > 1<<16-1 {
 
- 		panic("impossibly large header")
 
- 	}
 
- 	overhead := 2 + hdrSize + 4
 
- 	totSize := overhead + size
 
- 	buf := BufferPool.Get(totSize)
 
- 	defer BufferPool.Put(buf)
 
- 	// Message
 
- 	if _, err := protoutil.MarshalTo(buf[overhead:], msg); err != nil {
 
- 		return fmt.Errorf("marshalling message: %w", err)
 
- 	}
 
- 	if c.shouldCompressMessage(msg) {
 
- 		ok, err := c.writeCompressedMessage(msg, buf[overhead:])
 
- 		if ok {
 
- 			return err
 
- 		}
 
- 	}
 
- 	metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(totSize))
 
- 	// Header length
 
- 	binary.BigEndian.PutUint16(buf, uint16(hdrSize))
 
- 	// Header
 
- 	if _, err := protoutil.MarshalTo(buf[2:], hdr); err != nil {
 
- 		return fmt.Errorf("marshalling header: %w", err)
 
- 	}
 
- 	// Message length
 
- 	binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(size))
 
- 	n, err := c.cw.Write(buf)
 
- 	l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message), err=%v", n, hdrSize, size, err)
 
- 	if err != nil {
 
- 		return fmt.Errorf("writing message: %w", err)
 
- 	}
 
- 	return nil
 
- }
 
- // Write msg out compressed, given its uncompressed marshaled payload.
 
- //
 
- // The first return value indicates whether compression succeeded.
 
- // If not, the caller should retry without compression.
 
- func (c *rawConnection) writeCompressedMessage(msg proto.Message, marshaled []byte) (ok bool, err error) {
 
- 	hdr := &bep.Header{
 
- 		Type:        typeOf(msg),
 
- 		Compression: bep.MessageCompression_MESSAGE_COMPRESSION_LZ4,
 
- 	}
 
- 	hdrSize := proto.Size(hdr)
 
- 	if hdrSize > 1<<16-1 {
 
- 		panic("impossibly large header")
 
- 	}
 
- 	cOverhead := 2 + hdrSize + 4
 
- 	metricDeviceSentUncompressedBytes.WithLabelValues(c.idString).Add(float64(cOverhead + len(marshaled)))
 
- 	// The compressed size may be at most n-n/32 = .96875*n bytes,
 
- 	// I.e., if we can't save at least 3.125% bandwidth, we forgo compression.
 
- 	// This number is arbitrary but cheap to compute.
 
- 	maxCompressed := cOverhead + len(marshaled) - len(marshaled)/32
 
- 	buf := BufferPool.Get(maxCompressed)
 
- 	defer BufferPool.Put(buf)
 
- 	compressedSize, err := lz4Compress(marshaled, buf[cOverhead:])
 
- 	totSize := compressedSize + cOverhead
 
- 	if err != nil {
 
- 		return false, nil
 
- 	}
 
- 	// Header length
 
- 	binary.BigEndian.PutUint16(buf, uint16(hdrSize))
 
- 	// Header
 
- 	if _, err := protoutil.MarshalTo(buf[2:], hdr); err != nil {
 
- 		return true, fmt.Errorf("marshalling header: %w", err)
 
- 	}
 
- 	// Message length
 
- 	binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(compressedSize))
 
- 	n, err := c.cw.Write(buf[:totSize])
 
- 	l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message (%d uncompressed)), err=%v", n, hdrSize, compressedSize, len(marshaled), err)
 
- 	if err != nil {
 
- 		return true, fmt.Errorf("writing message: %w", err)
 
- 	}
 
- 	return true, nil
 
- }
 
- func typeOf(msg proto.Message) bep.MessageType {
 
- 	switch msg.(type) {
 
- 	case *bep.ClusterConfig:
 
- 		return bep.MessageType_MESSAGE_TYPE_CLUSTER_CONFIG
 
- 	case *bep.Index:
 
- 		return bep.MessageType_MESSAGE_TYPE_INDEX
 
- 	case *bep.IndexUpdate:
 
- 		return bep.MessageType_MESSAGE_TYPE_INDEX_UPDATE
 
- 	case *bep.Request:
 
- 		return bep.MessageType_MESSAGE_TYPE_REQUEST
 
- 	case *bep.Response:
 
- 		return bep.MessageType_MESSAGE_TYPE_RESPONSE
 
- 	case *bep.DownloadProgress:
 
- 		return bep.MessageType_MESSAGE_TYPE_DOWNLOAD_PROGRESS
 
- 	case *bep.Ping:
 
- 		return bep.MessageType_MESSAGE_TYPE_PING
 
- 	case *bep.Close:
 
- 		return bep.MessageType_MESSAGE_TYPE_CLOSE
 
- 	default:
 
- 		panic("bug: unknown message type")
 
- 	}
 
- }
 
- func newMessage(t bep.MessageType) (proto.Message, error) {
 
- 	switch t {
 
- 	case bep.MessageType_MESSAGE_TYPE_CLUSTER_CONFIG:
 
- 		return new(bep.ClusterConfig), nil
 
- 	case bep.MessageType_MESSAGE_TYPE_INDEX:
 
- 		return new(bep.Index), nil
 
- 	case bep.MessageType_MESSAGE_TYPE_INDEX_UPDATE:
 
- 		return new(bep.IndexUpdate), nil
 
- 	case bep.MessageType_MESSAGE_TYPE_REQUEST:
 
- 		return new(bep.Request), nil
 
- 	case bep.MessageType_MESSAGE_TYPE_RESPONSE:
 
- 		return new(bep.Response), nil
 
- 	case bep.MessageType_MESSAGE_TYPE_DOWNLOAD_PROGRESS:
 
- 		return new(bep.DownloadProgress), nil
 
- 	case bep.MessageType_MESSAGE_TYPE_PING:
 
- 		return new(bep.Ping), nil
 
- 	case bep.MessageType_MESSAGE_TYPE_CLOSE:
 
- 		return new(bep.Close), nil
 
- 	default:
 
- 		return nil, errUnknownMessage
 
- 	}
 
- }
 
- func (c *rawConnection) shouldCompressMessage(msg proto.Message) bool {
 
- 	switch c.compression {
 
- 	case CompressionNever:
 
- 		return false
 
- 	case CompressionAlways:
 
- 		// Use compression for large enough messages
 
- 		return proto.Size(msg) >= compressionThreshold
 
- 	case CompressionMetadata:
 
- 		_, isResponse := msg.(*bep.Response)
 
- 		// Compress if it's large enough and not a response message
 
- 		return !isResponse && proto.Size(msg) >= compressionThreshold
 
- 	default:
 
- 		panic("unknown compression setting")
 
- 	}
 
- }
 
- // Close is called when the connection is regularly closed and thus the Close
 
- // BEP message is sent before terminating the actual connection. The error
 
- // argument specifies the reason for closing the connection.
 
- func (c *rawConnection) Close(err error) {
 
- 	c.sendCloseOnce.Do(func() {
 
- 		done := make(chan struct{})
 
- 		timeout := time.NewTimer(CloseTimeout)
 
- 		select {
 
- 		case c.closeBox <- asyncMessage{&bep.Close{Reason: err.Error()}, done}:
 
- 			select {
 
- 			case <-done:
 
- 			case <-timeout.C:
 
- 			case <-c.closed:
 
- 			}
 
- 		case <-timeout.C:
 
- 		case <-c.closed:
 
- 		}
 
- 	})
 
- 	// Close might be called from a method that is called from within
 
- 	// dispatcherLoop, resulting in a deadlock.
 
- 	// The sending above must happen before spawning the routine, to prevent
 
- 	// the underlying connection from terminating before sending the close msg.
 
- 	go c.internalClose(err)
 
- }
 
- // internalClose is called if there is an unexpected error during normal operation.
 
- func (c *rawConnection) internalClose(err error) {
 
- 	c.closeOnce.Do(func() {
 
- 		c.startStopMut.Lock()
 
- 		l.Debugf("close connection to %s at %s due to %v", c.deviceID.Short(), c.ConnectionInfo, err)
 
- 		if cerr := c.closer.Close(); cerr != nil {
 
- 			l.Debugf("failed to close underlying conn %s at %s %v:", c.deviceID.Short(), c.ConnectionInfo, cerr)
 
- 		}
 
- 		close(c.closed)
 
- 		c.awaitingMut.Lock()
 
- 		for i, ch := range c.awaiting {
 
- 			if ch != nil {
 
- 				close(ch)
 
- 				delete(c.awaiting, i)
 
- 			}
 
- 		}
 
- 		c.awaitingMut.Unlock()
 
- 		if !c.startTime.IsZero() {
 
- 			// Wait for the dispatcher loop to exit, if it was started to
 
- 			// begin with.
 
- 			<-c.dispatcherLoopStopped
 
- 		}
 
- 		c.startStopMut.Unlock()
 
- 		// We don't want to call into the model while holding the
 
- 		// startStopMut.
 
- 		c.model.Closed(err)
 
- 	})
 
- }
 
- // The pingSender makes sure that we've sent a message within the last
 
- // PingSendInterval. If we already have something sent in the last
 
- // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
 
- // results in an effecting ping interval of somewhere between
 
- // PingSendInterval/2 and PingSendInterval.
 
- func (c *rawConnection) pingSender() {
 
- 	ticker := time.NewTicker(PingSendInterval / 2)
 
- 	defer ticker.Stop()
 
- 	for {
 
- 		select {
 
- 		case <-ticker.C:
 
- 			d := time.Since(c.cw.Last())
 
- 			if d < PingSendInterval/2 {
 
- 				l.Debugln(c.deviceID, "ping skipped after wr", d)
 
- 				continue
 
- 			}
 
- 			l.Debugln(c.deviceID, "ping -> after", d)
 
- 			c.ping()
 
- 		case <-c.closed:
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- // The pingReceiver checks that we've received a message (any message will do,
 
- // but we expect pings in the absence of other messages) within the last
 
- // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
 
- func (c *rawConnection) pingReceiver() {
 
- 	ticker := time.NewTicker(ReceiveTimeout / 2)
 
- 	defer ticker.Stop()
 
- 	for {
 
- 		select {
 
- 		case <-ticker.C:
 
- 			d := time.Since(c.cr.Last())
 
- 			if d > ReceiveTimeout {
 
- 				l.Debugln(c.deviceID, "ping timeout", d)
 
- 				c.internalClose(ErrTimeout)
 
- 			}
 
- 			l.Debugln(c.deviceID, "last read within", d)
 
- 		case <-c.closed:
 
- 			return
 
- 		}
 
- 	}
 
- }
 
- type Statistics struct {
 
- 	At            time.Time `json:"at"`
 
- 	InBytesTotal  int64     `json:"inBytesTotal"`
 
- 	OutBytesTotal int64     `json:"outBytesTotal"`
 
- 	StartedAt     time.Time `json:"startedAt"`
 
- }
 
- func (c *rawConnection) Statistics() Statistics {
 
- 	return Statistics{
 
- 		At:            time.Now().Truncate(time.Second),
 
- 		InBytesTotal:  c.cr.Tot(),
 
- 		OutBytesTotal: c.cw.Tot(),
 
- 		StartedAt:     c.startTime,
 
- 	}
 
- }
 
- func lz4Compress(src, buf []byte) (int, error) {
 
- 	n, err := lz4.CompressBlock(src, buf[4:], nil)
 
- 	if err != nil {
 
- 		return -1, err
 
- 	} else if n == 0 {
 
- 		return -1, errNotCompressible
 
- 	}
 
- 	// The compressed block is prefixed by the size of the uncompressed data.
 
- 	binary.BigEndian.PutUint32(buf, uint32(len(src)))
 
- 	return n + 4, nil
 
- }
 
- func lz4Decompress(src []byte) ([]byte, error) {
 
- 	size := binary.BigEndian.Uint32(src)
 
- 	buf := BufferPool.Get(int(size))
 
- 	n, err := lz4.UncompressBlock(src[4:], buf)
 
- 	if err != nil {
 
- 		BufferPool.Put(buf)
 
- 		return nil, err
 
- 	}
 
- 	return buf[:n], nil
 
- }
 
- func newProtocolError(err error, msgContext string) error {
 
- 	return fmt.Errorf("protocol error on %v: %w", msgContext, err)
 
- }
 
- func newHandleError(err error, msgContext string) error {
 
- 	return fmt.Errorf("handling %v: %w", msgContext, err)
 
- }
 
- func messageContext(msg proto.Message) (string, error) {
 
- 	switch msg := msg.(type) {
 
- 	case *bep.ClusterConfig:
 
- 		return "cluster-config", nil
 
- 	case *bep.Index:
 
- 		return fmt.Sprintf("index for %v", msg.Folder), nil
 
- 	case *bep.IndexUpdate:
 
- 		return fmt.Sprintf("index-update for %v", msg.Folder), nil
 
- 	case *bep.Request:
 
- 		return fmt.Sprintf(`request for "%v" in %v`, msg.Name, msg.Folder), nil
 
- 	case *bep.Response:
 
- 		return "response", nil
 
- 	case *bep.DownloadProgress:
 
- 		return fmt.Sprintf("download-progress for %v", msg.Folder), nil
 
- 	case *bep.Ping:
 
- 		return "ping", nil
 
- 	case *bep.Close:
 
- 		return "close", nil
 
- 	default:
 
- 		return "", errors.New("unknown or empty message")
 
- 	}
 
- }
 
- // connectionWrappingModel takes the Model interface from the model package,
 
- // which expects the Connection as the first parameter in all methods, and
 
- // wraps it to conform to the rawModel interface.
 
- type connectionWrappingModel struct {
 
- 	conn  Connection
 
- 	model Model
 
- }
 
- func (c *connectionWrappingModel) Index(m *Index) error {
 
- 	return c.model.Index(c.conn, m)
 
- }
 
- func (c *connectionWrappingModel) IndexUpdate(idxUp *IndexUpdate) error {
 
- 	return c.model.IndexUpdate(c.conn, idxUp)
 
- }
 
- func (c *connectionWrappingModel) Request(req *Request) (RequestResponse, error) {
 
- 	return c.model.Request(c.conn, req)
 
- }
 
- func (c *connectionWrappingModel) ClusterConfig(config *ClusterConfig) error {
 
- 	return c.model.ClusterConfig(c.conn, config)
 
- }
 
- func (c *connectionWrappingModel) Closed(err error) {
 
- 	c.model.Closed(c.conn, err)
 
- }
 
- func (c *connectionWrappingModel) DownloadProgress(p *DownloadProgress) error {
 
- 	return c.model.DownloadProgress(c.conn, p)
 
- }
 
 
  |