protocol.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090
  1. // Copyright (C) 2014 The Protocol Authors.
  2. // Prevents import loop, for internal testing
  3. //go:generate counterfeiter -o mocked_connection_info_test.go --fake-name mockedConnectionInfo . ConnectionInfo
  4. //go:generate go run ../../script/prune_mocks.go -t mocked_connection_info_test.go
  5. //go:generate counterfeiter -o mocks/connection_info.go --fake-name ConnectionInfo . ConnectionInfo
  6. //go:generate counterfeiter -o mocks/connection.go --fake-name Connection . Connection
  7. package protocol
  8. import (
  9. "context"
  10. "crypto/sha256"
  11. "encoding/binary"
  12. "fmt"
  13. "io"
  14. "net"
  15. "path"
  16. "strings"
  17. "sync"
  18. "time"
  19. lz4 "github.com/bkaradzic/go-lz4"
  20. "github.com/pkg/errors"
  21. )
  22. const (
  23. // Shifts
  24. KiB = 10
  25. MiB = 20
  26. GiB = 30
  27. )
  28. const (
  29. // MaxMessageLen is the largest message size allowed on the wire. (500 MB)
  30. MaxMessageLen = 500 * 1000 * 1000
  31. // MinBlockSize is the minimum block size allowed
  32. MinBlockSize = 128 << KiB
  33. // MaxBlockSize is the maximum block size allowed
  34. MaxBlockSize = 16 << MiB
  35. // DesiredPerFileBlocks is the number of blocks we aim for per file
  36. DesiredPerFileBlocks = 2000
  37. )
  38. // BlockSizes is the list of valid block sizes, from min to max
  39. var BlockSizes []int
  40. // For each block size, the hash of a block of all zeroes
  41. var sha256OfEmptyBlock = map[int][sha256.Size]byte{
  42. 128 << KiB: {0xfa, 0x43, 0x23, 0x9b, 0xce, 0xe7, 0xb9, 0x7c, 0xa6, 0x2f, 0x0, 0x7c, 0xc6, 0x84, 0x87, 0x56, 0xa, 0x39, 0xe1, 0x9f, 0x74, 0xf3, 0xdd, 0xe7, 0x48, 0x6d, 0xb3, 0xf9, 0x8d, 0xf8, 0xe4, 0x71},
  43. 256 << KiB: {0x8a, 0x39, 0xd2, 0xab, 0xd3, 0x99, 0x9a, 0xb7, 0x3c, 0x34, 0xdb, 0x24, 0x76, 0x84, 0x9c, 0xdd, 0xf3, 0x3, 0xce, 0x38, 0x9b, 0x35, 0x82, 0x68, 0x50, 0xf9, 0xa7, 0x0, 0x58, 0x9b, 0x4a, 0x90},
  44. 512 << KiB: {0x7, 0x85, 0x4d, 0x2f, 0xef, 0x29, 0x7a, 0x6, 0xba, 0x81, 0x68, 0x5e, 0x66, 0xc, 0x33, 0x2d, 0xe3, 0x6d, 0x5d, 0x18, 0xd5, 0x46, 0x92, 0x7d, 0x30, 0xda, 0xad, 0x6d, 0x7f, 0xda, 0x15, 0x41},
  45. 1 << MiB: {0x30, 0xe1, 0x49, 0x55, 0xeb, 0xf1, 0x35, 0x22, 0x66, 0xdc, 0x2f, 0xf8, 0x6, 0x7e, 0x68, 0x10, 0x46, 0x7, 0xe7, 0x50, 0xab, 0xb9, 0xd3, 0xb3, 0x65, 0x82, 0xb8, 0xaf, 0x90, 0x9f, 0xcb, 0x58},
  46. 2 << MiB: {0x56, 0x47, 0xf0, 0x5e, 0xc1, 0x89, 0x58, 0x94, 0x7d, 0x32, 0x87, 0x4e, 0xeb, 0x78, 0x8f, 0xa3, 0x96, 0xa0, 0x5d, 0xb, 0xab, 0x7c, 0x1b, 0x71, 0xf1, 0x12, 0xce, 0xb7, 0xe9, 0xb3, 0x1e, 0xee},
  47. 4 << MiB: {0xbb, 0x9f, 0x8d, 0xf6, 0x14, 0x74, 0xd2, 0x5e, 0x71, 0xfa, 0x0, 0x72, 0x23, 0x18, 0xcd, 0x38, 0x73, 0x96, 0xca, 0x17, 0x36, 0x60, 0x5e, 0x12, 0x48, 0x82, 0x1c, 0xc0, 0xde, 0x3d, 0x3a, 0xf8},
  48. 8 << MiB: {0x2d, 0xae, 0xb1, 0xf3, 0x60, 0x95, 0xb4, 0x4b, 0x31, 0x84, 0x10, 0xb3, 0xf4, 0xe8, 0xb5, 0xd9, 0x89, 0xdc, 0xc7, 0xbb, 0x2, 0x3d, 0x14, 0x26, 0xc4, 0x92, 0xda, 0xb0, 0xa3, 0x5, 0x3e, 0x74},
  49. 16 << MiB: {0x8, 0xa, 0xcf, 0x35, 0xa5, 0x7, 0xac, 0x98, 0x49, 0xcf, 0xcb, 0xa4, 0x7d, 0xc2, 0xad, 0x83, 0xe0, 0x1b, 0x75, 0x66, 0x3a, 0x51, 0x62, 0x79, 0xc8, 0xb9, 0xd2, 0x43, 0xb7, 0x19, 0x64, 0x3e},
  50. }
  51. func init() {
  52. for blockSize := MinBlockSize; blockSize <= MaxBlockSize; blockSize *= 2 {
  53. BlockSizes = append(BlockSizes, blockSize)
  54. if _, ok := sha256OfEmptyBlock[blockSize]; !ok {
  55. panic("missing hard coded value for sha256 of empty block")
  56. }
  57. }
  58. BufferPool = newBufferPool()
  59. }
  60. // BlockSize returns the block size to use for the given file size
  61. func BlockSize(fileSize int64) int {
  62. var blockSize int
  63. for _, blockSize = range BlockSizes {
  64. if fileSize < DesiredPerFileBlocks*int64(blockSize) {
  65. break
  66. }
  67. }
  68. return blockSize
  69. }
  70. const (
  71. stateInitial = iota
  72. stateReady
  73. )
  74. // FileInfo.LocalFlags flags
  75. const (
  76. FlagLocalUnsupported = 1 << 0 // The kind is unsupported, e.g. symlinks on Windows
  77. FlagLocalIgnored = 1 << 1 // Matches local ignore patterns
  78. FlagLocalMustRescan = 1 << 2 // Doesn't match content on disk, must be rechecked fully
  79. FlagLocalReceiveOnly = 1 << 3 // Change detected on receive only folder
  80. // Flags that should result in the Invalid bit on outgoing updates
  81. LocalInvalidFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalMustRescan | FlagLocalReceiveOnly
  82. // Flags that should result in a file being in conflict with its
  83. // successor, due to us not having an up to date picture of its state on
  84. // disk.
  85. LocalConflictFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalReceiveOnly
  86. LocalAllFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalMustRescan | FlagLocalReceiveOnly
  87. )
  88. var (
  89. ErrClosed = errors.New("connection closed")
  90. ErrTimeout = errors.New("read timeout")
  91. errUnknownMessage = errors.New("unknown message")
  92. errInvalidFilename = errors.New("filename is invalid")
  93. errUncleanFilename = errors.New("filename not in canonical format")
  94. errDeletedHasBlocks = errors.New("deleted file with non-empty block list")
  95. errDirectoryHasBlocks = errors.New("directory with non-empty block list")
  96. errFileHasNoBlocks = errors.New("file with empty block list")
  97. )
  98. type Model interface {
  99. // An index was received from the peer device
  100. Index(deviceID DeviceID, folder string, files []FileInfo) error
  101. // An index update was received from the peer device
  102. IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error
  103. // A request was made by the peer device
  104. Request(deviceID DeviceID, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error)
  105. // A cluster configuration message was received
  106. ClusterConfig(deviceID DeviceID, config ClusterConfig) error
  107. // The peer device closed the connection
  108. Closed(conn Connection, err error)
  109. // The peer device sent progress updates for the files it is currently downloading
  110. DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) error
  111. }
  112. type RequestResponse interface {
  113. Data() []byte
  114. Close() // Must always be called once the byte slice is no longer in use
  115. Wait() // Blocks until Close is called
  116. }
  117. type Connection interface {
  118. Start()
  119. Close(err error)
  120. ID() DeviceID
  121. Index(ctx context.Context, folder string, files []FileInfo) error
  122. IndexUpdate(ctx context.Context, folder string, files []FileInfo) error
  123. Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
  124. ClusterConfig(config ClusterConfig)
  125. DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate)
  126. Statistics() Statistics
  127. Closed() bool
  128. ConnectionInfo
  129. }
  130. type ConnectionInfo interface {
  131. Type() string
  132. Transport() string
  133. RemoteAddr() net.Addr
  134. Priority() int
  135. String() string
  136. Crypto() string
  137. EstablishedAt() time.Time
  138. }
  139. type rawConnection struct {
  140. ConnectionInfo
  141. id DeviceID
  142. receiver Model
  143. startTime time.Time
  144. cr *countingReader
  145. cw *countingWriter
  146. closer io.Closer // Closing the underlying connection and thus cr and cw
  147. awaiting map[int]chan asyncResult
  148. awaitingMut sync.Mutex
  149. idxMut sync.Mutex // ensures serialization of Index calls
  150. nextID int
  151. nextIDMut sync.Mutex
  152. inbox chan message
  153. outbox chan asyncMessage
  154. closeBox chan asyncMessage
  155. clusterConfigBox chan *ClusterConfig
  156. dispatcherLoopStopped chan struct{}
  157. closed chan struct{}
  158. closeOnce sync.Once
  159. sendCloseOnce sync.Once
  160. compression Compression
  161. loopWG sync.WaitGroup // Need to ensure no leftover routines in testing
  162. }
  163. type asyncResult struct {
  164. val []byte
  165. err error
  166. }
  167. type message interface {
  168. ProtoSize() int
  169. Marshal() ([]byte, error)
  170. MarshalTo([]byte) (int, error)
  171. Unmarshal([]byte) error
  172. }
  173. type asyncMessage struct {
  174. msg message
  175. done chan struct{} // done closes when we're done sending the message
  176. }
  177. const (
  178. // PingSendInterval is how often we make sure to send a message, by
  179. // triggering pings if necessary.
  180. PingSendInterval = 90 * time.Second
  181. // ReceiveTimeout is the longest we'll wait for a message from the other
  182. // side before closing the connection.
  183. ReceiveTimeout = 300 * time.Second
  184. )
  185. // CloseTimeout is the longest we'll wait when trying to send the close
  186. // message before just closing the connection.
  187. // Should not be modified in production code, just for testing.
  188. var CloseTimeout = 10 * time.Second
  189. func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver Model, connInfo ConnectionInfo, compress Compression) Connection {
  190. receiver = nativeModel{receiver}
  191. rc := newRawConnection(deviceID, reader, writer, closer, receiver, connInfo, compress)
  192. return wireFormatConnection{rc}
  193. }
  194. func NewEncryptedConnection(passwords map[string]string, deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver Model, connInfo ConnectionInfo, compress Compression) Connection {
  195. keys := keysFromPasswords(passwords)
  196. // Encryption / decryption is first (outermost) before conversion to
  197. // native path formats.
  198. nm := nativeModel{receiver}
  199. em := encryptedModel{model: nm, folderKeys: keys}
  200. // We do the wire format conversion first (outermost) so that the
  201. // metadata is in wire format when it reaches the encryption step.
  202. rc := newRawConnection(deviceID, reader, writer, closer, em, connInfo, compress)
  203. ec := encryptedConnection{ConnectionInfo: rc, conn: rc, folderKeys: keys}
  204. wc := wireFormatConnection{ec}
  205. return wc
  206. }
  207. func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver Model, connInfo ConnectionInfo, compress Compression) *rawConnection {
  208. cr := &countingReader{Reader: reader}
  209. cw := &countingWriter{Writer: writer}
  210. return &rawConnection{
  211. ConnectionInfo: connInfo,
  212. id: deviceID,
  213. receiver: receiver,
  214. cr: cr,
  215. cw: cw,
  216. closer: closer,
  217. awaiting: make(map[int]chan asyncResult),
  218. inbox: make(chan message),
  219. outbox: make(chan asyncMessage),
  220. closeBox: make(chan asyncMessage),
  221. clusterConfigBox: make(chan *ClusterConfig),
  222. dispatcherLoopStopped: make(chan struct{}),
  223. closed: make(chan struct{}),
  224. compression: compress,
  225. loopWG: sync.WaitGroup{},
  226. }
  227. }
  228. // Start creates the goroutines for sending and receiving of messages. It must
  229. // be called exactly once after creating a connection.
  230. func (c *rawConnection) Start() {
  231. c.loopWG.Add(5)
  232. go func() {
  233. c.readerLoop()
  234. c.loopWG.Done()
  235. }()
  236. go func() {
  237. err := c.dispatcherLoop()
  238. c.Close(err)
  239. c.loopWG.Done()
  240. }()
  241. go func() {
  242. c.writerLoop()
  243. c.loopWG.Done()
  244. }()
  245. go func() {
  246. c.pingSender()
  247. c.loopWG.Done()
  248. }()
  249. go func() {
  250. c.pingReceiver()
  251. c.loopWG.Done()
  252. }()
  253. c.startTime = time.Now().Truncate(time.Second)
  254. }
  255. func (c *rawConnection) ID() DeviceID {
  256. return c.id
  257. }
  258. // Index writes the list of file information to the connected peer device
  259. func (c *rawConnection) Index(ctx context.Context, folder string, idx []FileInfo) error {
  260. select {
  261. case <-c.closed:
  262. return ErrClosed
  263. default:
  264. }
  265. c.idxMut.Lock()
  266. c.send(ctx, &Index{
  267. Folder: folder,
  268. Files: idx,
  269. }, nil)
  270. c.idxMut.Unlock()
  271. return nil
  272. }
  273. // IndexUpdate writes the list of file information to the connected peer device as an update
  274. func (c *rawConnection) IndexUpdate(ctx context.Context, folder string, idx []FileInfo) error {
  275. select {
  276. case <-c.closed:
  277. return ErrClosed
  278. default:
  279. }
  280. c.idxMut.Lock()
  281. c.send(ctx, &IndexUpdate{
  282. Folder: folder,
  283. Files: idx,
  284. }, nil)
  285. c.idxMut.Unlock()
  286. return nil
  287. }
  288. // Request returns the bytes for the specified block after fetching them from the connected peer.
  289. func (c *rawConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
  290. c.nextIDMut.Lock()
  291. id := c.nextID
  292. c.nextID++
  293. c.nextIDMut.Unlock()
  294. c.awaitingMut.Lock()
  295. if _, ok := c.awaiting[id]; ok {
  296. c.awaitingMut.Unlock()
  297. panic("id taken")
  298. }
  299. rc := make(chan asyncResult, 1)
  300. c.awaiting[id] = rc
  301. c.awaitingMut.Unlock()
  302. ok := c.send(ctx, &Request{
  303. ID: id,
  304. Folder: folder,
  305. Name: name,
  306. Offset: offset,
  307. Size: size,
  308. BlockNo: blockNo,
  309. Hash: hash,
  310. WeakHash: weakHash,
  311. FromTemporary: fromTemporary,
  312. }, nil)
  313. if !ok {
  314. return nil, ErrClosed
  315. }
  316. select {
  317. case res, ok := <-rc:
  318. if !ok {
  319. return nil, ErrClosed
  320. }
  321. return res.val, res.err
  322. case <-ctx.Done():
  323. return nil, ctx.Err()
  324. }
  325. }
  326. // ClusterConfig sends the cluster configuration message to the peer.
  327. func (c *rawConnection) ClusterConfig(config ClusterConfig) {
  328. select {
  329. case c.clusterConfigBox <- &config:
  330. case <-c.closed:
  331. }
  332. }
  333. func (c *rawConnection) Closed() bool {
  334. select {
  335. case <-c.closed:
  336. return true
  337. default:
  338. return false
  339. }
  340. }
  341. // DownloadProgress sends the progress updates for the files that are currently being downloaded.
  342. func (c *rawConnection) DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) {
  343. c.send(ctx, &DownloadProgress{
  344. Folder: folder,
  345. Updates: updates,
  346. }, nil)
  347. }
  348. func (c *rawConnection) ping() bool {
  349. return c.send(context.Background(), &Ping{}, nil)
  350. }
  351. func (c *rawConnection) readerLoop() {
  352. fourByteBuf := make([]byte, 4)
  353. for {
  354. msg, err := c.readMessage(fourByteBuf)
  355. if err != nil {
  356. if err == errUnknownMessage {
  357. // Unknown message types are skipped, for future extensibility.
  358. continue
  359. }
  360. c.internalClose(err)
  361. return
  362. }
  363. select {
  364. case c.inbox <- msg:
  365. case <-c.closed:
  366. return
  367. }
  368. }
  369. }
  370. func (c *rawConnection) dispatcherLoop() (err error) {
  371. defer close(c.dispatcherLoopStopped)
  372. var msg message
  373. state := stateInitial
  374. for {
  375. select {
  376. case msg = <-c.inbox:
  377. case <-c.closed:
  378. return ErrClosed
  379. }
  380. msgContext, err := messageContext(msg)
  381. if err != nil {
  382. return fmt.Errorf("protocol error: %w", err)
  383. }
  384. l.Debugf("handle %v message", msgContext)
  385. switch msg := msg.(type) {
  386. case *ClusterConfig:
  387. if state == stateInitial {
  388. state = stateReady
  389. }
  390. case *Close:
  391. return fmt.Errorf("closed by remote: %v", msg.Reason)
  392. default:
  393. if state != stateReady {
  394. return newProtocolError(fmt.Errorf("invalid state %d", state), msgContext)
  395. }
  396. }
  397. switch msg := msg.(type) {
  398. case *Index:
  399. err = checkIndexConsistency(msg.Files)
  400. case *IndexUpdate:
  401. err = checkIndexConsistency(msg.Files)
  402. case *Request:
  403. err = checkFilename(msg.Name)
  404. }
  405. if err != nil {
  406. return newProtocolError(err, msgContext)
  407. }
  408. switch msg := msg.(type) {
  409. case *ClusterConfig:
  410. err = c.receiver.ClusterConfig(c.id, *msg)
  411. case *Index:
  412. err = c.handleIndex(*msg)
  413. case *IndexUpdate:
  414. err = c.handleIndexUpdate(*msg)
  415. case *Request:
  416. go c.handleRequest(*msg)
  417. case *Response:
  418. c.handleResponse(*msg)
  419. case *DownloadProgress:
  420. err = c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates)
  421. }
  422. if err != nil {
  423. return newHandleError(err, msgContext)
  424. }
  425. }
  426. }
  427. func (c *rawConnection) readMessage(fourByteBuf []byte) (message, error) {
  428. hdr, err := c.readHeader(fourByteBuf)
  429. if err != nil {
  430. return nil, err
  431. }
  432. return c.readMessageAfterHeader(hdr, fourByteBuf)
  433. }
  434. func (c *rawConnection) readMessageAfterHeader(hdr Header, fourByteBuf []byte) (message, error) {
  435. // First comes a 4 byte message length
  436. if _, err := io.ReadFull(c.cr, fourByteBuf[:4]); err != nil {
  437. return nil, errors.Wrap(err, "reading message length")
  438. }
  439. msgLen := int32(binary.BigEndian.Uint32(fourByteBuf))
  440. if msgLen < 0 {
  441. return nil, fmt.Errorf("negative message length %d", msgLen)
  442. } else if msgLen > MaxMessageLen {
  443. return nil, fmt.Errorf("message length %d exceeds maximum %d", msgLen, MaxMessageLen)
  444. }
  445. // Then comes the message
  446. buf := BufferPool.Get(int(msgLen))
  447. if _, err := io.ReadFull(c.cr, buf); err != nil {
  448. BufferPool.Put(buf)
  449. return nil, errors.Wrap(err, "reading message")
  450. }
  451. // ... which might be compressed
  452. switch hdr.Compression {
  453. case MessageCompressionNone:
  454. // Nothing
  455. case MessageCompressionLZ4:
  456. decomp, err := c.lz4Decompress(buf)
  457. BufferPool.Put(buf)
  458. if err != nil {
  459. return nil, errors.Wrap(err, "decompressing message")
  460. }
  461. buf = decomp
  462. default:
  463. return nil, fmt.Errorf("unknown message compression %d", hdr.Compression)
  464. }
  465. // ... and is then unmarshalled
  466. msg, err := c.newMessage(hdr.Type)
  467. if err != nil {
  468. BufferPool.Put(buf)
  469. return nil, err
  470. }
  471. if err := msg.Unmarshal(buf); err != nil {
  472. BufferPool.Put(buf)
  473. return nil, errors.Wrap(err, "unmarshalling message")
  474. }
  475. BufferPool.Put(buf)
  476. return msg, nil
  477. }
  478. func (c *rawConnection) readHeader(fourByteBuf []byte) (Header, error) {
  479. // First comes a 2 byte header length
  480. if _, err := io.ReadFull(c.cr, fourByteBuf[:2]); err != nil {
  481. return Header{}, errors.Wrap(err, "reading length")
  482. }
  483. hdrLen := int16(binary.BigEndian.Uint16(fourByteBuf))
  484. if hdrLen < 0 {
  485. return Header{}, fmt.Errorf("negative header length %d", hdrLen)
  486. }
  487. // Then comes the header
  488. buf := BufferPool.Get(int(hdrLen))
  489. if _, err := io.ReadFull(c.cr, buf); err != nil {
  490. BufferPool.Put(buf)
  491. return Header{}, errors.Wrap(err, "reading header")
  492. }
  493. var hdr Header
  494. err := hdr.Unmarshal(buf)
  495. BufferPool.Put(buf)
  496. if err != nil {
  497. return Header{}, errors.Wrap(err, "unmarshalling header")
  498. }
  499. return hdr, nil
  500. }
  501. func (c *rawConnection) handleIndex(im Index) error {
  502. l.Debugf("Index(%v, %v, %d file)", c.id, im.Folder, len(im.Files))
  503. return c.receiver.Index(c.id, im.Folder, im.Files)
  504. }
  505. func (c *rawConnection) handleIndexUpdate(im IndexUpdate) error {
  506. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
  507. return c.receiver.IndexUpdate(c.id, im.Folder, im.Files)
  508. }
  509. // checkIndexConsistency verifies a number of invariants on FileInfos received in
  510. // index messages.
  511. func checkIndexConsistency(fs []FileInfo) error {
  512. for _, f := range fs {
  513. if err := checkFileInfoConsistency(f); err != nil {
  514. return errors.Wrapf(err, "%q", f.Name)
  515. }
  516. }
  517. return nil
  518. }
  519. // checkFileInfoConsistency verifies a number of invariants on the given FileInfo
  520. func checkFileInfoConsistency(f FileInfo) error {
  521. if err := checkFilename(f.Name); err != nil {
  522. return err
  523. }
  524. switch {
  525. case f.Deleted && len(f.Blocks) != 0:
  526. // Deleted files should have no blocks
  527. return errDeletedHasBlocks
  528. case f.Type == FileInfoTypeDirectory && len(f.Blocks) != 0:
  529. // Directories should have no blocks
  530. return errDirectoryHasBlocks
  531. case !f.Deleted && !f.IsInvalid() && f.Type == FileInfoTypeFile && len(f.Blocks) == 0:
  532. // Non-deleted, non-invalid files should have at least one block
  533. return errFileHasNoBlocks
  534. }
  535. return nil
  536. }
  537. // checkFilename verifies that the given filename is valid according to the
  538. // spec on what's allowed over the wire. A filename failing this test is
  539. // grounds for disconnecting the device.
  540. func checkFilename(name string) error {
  541. cleanedName := path.Clean(name)
  542. if cleanedName != name {
  543. // The filename on the wire should be in canonical format. If
  544. // Clean() managed to clean it up, there was something wrong with
  545. // it.
  546. return errUncleanFilename
  547. }
  548. switch name {
  549. case "", ".", "..":
  550. // These names are always invalid.
  551. return errInvalidFilename
  552. }
  553. if strings.HasPrefix(name, "/") {
  554. // Names are folder relative, not absolute.
  555. return errInvalidFilename
  556. }
  557. if strings.HasPrefix(name, "../") {
  558. // Starting with a dotdot is not allowed. Any other dotdots would
  559. // have been handled by the Clean() call at the top.
  560. return errInvalidFilename
  561. }
  562. return nil
  563. }
  564. func (c *rawConnection) handleRequest(req Request) {
  565. res, err := c.receiver.Request(c.id, req.Folder, req.Name, int32(req.BlockNo), int32(req.Size), req.Offset, req.Hash, req.WeakHash, req.FromTemporary)
  566. if err != nil {
  567. c.send(context.Background(), &Response{
  568. ID: req.ID,
  569. Code: errorToCode(err),
  570. }, nil)
  571. return
  572. }
  573. done := make(chan struct{})
  574. c.send(context.Background(), &Response{
  575. ID: req.ID,
  576. Data: res.Data(),
  577. Code: errorToCode(nil),
  578. }, done)
  579. <-done
  580. res.Close()
  581. }
  582. func (c *rawConnection) handleResponse(resp Response) {
  583. c.awaitingMut.Lock()
  584. if rc := c.awaiting[resp.ID]; rc != nil {
  585. delete(c.awaiting, resp.ID)
  586. rc <- asyncResult{resp.Data, codeToError(resp.Code)}
  587. close(rc)
  588. }
  589. c.awaitingMut.Unlock()
  590. }
  591. func (c *rawConnection) send(ctx context.Context, msg message, done chan struct{}) bool {
  592. select {
  593. case c.outbox <- asyncMessage{msg, done}:
  594. return true
  595. case <-c.closed:
  596. case <-ctx.Done():
  597. }
  598. if done != nil {
  599. close(done)
  600. }
  601. return false
  602. }
  603. func (c *rawConnection) writerLoop() {
  604. select {
  605. case cc := <-c.clusterConfigBox:
  606. err := c.writeMessage(cc)
  607. if err != nil {
  608. c.internalClose(err)
  609. return
  610. }
  611. case hm := <-c.closeBox:
  612. _ = c.writeMessage(hm.msg)
  613. close(hm.done)
  614. return
  615. case <-c.closed:
  616. return
  617. }
  618. for {
  619. select {
  620. case cc := <-c.clusterConfigBox:
  621. err := c.writeMessage(cc)
  622. if err != nil {
  623. c.internalClose(err)
  624. return
  625. }
  626. case hm := <-c.outbox:
  627. err := c.writeMessage(hm.msg)
  628. if hm.done != nil {
  629. close(hm.done)
  630. }
  631. if err != nil {
  632. c.internalClose(err)
  633. return
  634. }
  635. case hm := <-c.closeBox:
  636. _ = c.writeMessage(hm.msg)
  637. close(hm.done)
  638. return
  639. case <-c.closed:
  640. return
  641. }
  642. }
  643. }
  644. func (c *rawConnection) writeMessage(msg message) error {
  645. if c.shouldCompressMessage(msg) {
  646. return c.writeCompressedMessage(msg)
  647. }
  648. return c.writeUncompressedMessage(msg)
  649. }
  650. func (c *rawConnection) writeCompressedMessage(msg message) error {
  651. size := msg.ProtoSize()
  652. buf := BufferPool.Get(size)
  653. if _, err := msg.MarshalTo(buf); err != nil {
  654. BufferPool.Put(buf)
  655. return errors.Wrap(err, "marshalling message")
  656. }
  657. compressed, err := c.lz4Compress(buf)
  658. if err != nil {
  659. BufferPool.Put(buf)
  660. return errors.Wrap(err, "compressing message")
  661. }
  662. hdr := Header{
  663. Type: c.typeOf(msg),
  664. Compression: MessageCompressionLZ4,
  665. }
  666. hdrSize := hdr.ProtoSize()
  667. if hdrSize > 1<<16-1 {
  668. panic("impossibly large header")
  669. }
  670. compressedSize := len(compressed)
  671. totSize := 2 + hdrSize + 4 + compressedSize
  672. buf = BufferPool.Upgrade(buf, totSize)
  673. // Header length
  674. binary.BigEndian.PutUint16(buf, uint16(hdrSize))
  675. // Header
  676. if _, err := hdr.MarshalTo(buf[2:]); err != nil {
  677. BufferPool.Put(buf)
  678. BufferPool.Put(compressed)
  679. return errors.Wrap(err, "marshalling header")
  680. }
  681. // Message length
  682. binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(compressedSize))
  683. // Message
  684. copy(buf[2+hdrSize+4:], compressed)
  685. BufferPool.Put(compressed)
  686. n, err := c.cw.Write(buf)
  687. BufferPool.Put(buf)
  688. l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message (%d uncompressed)), err=%v", n, hdrSize, compressedSize, size, err)
  689. if err != nil {
  690. return errors.Wrap(err, "writing message")
  691. }
  692. return nil
  693. }
  694. func (c *rawConnection) writeUncompressedMessage(msg message) error {
  695. size := msg.ProtoSize()
  696. hdr := Header{
  697. Type: c.typeOf(msg),
  698. }
  699. hdrSize := hdr.ProtoSize()
  700. if hdrSize > 1<<16-1 {
  701. panic("impossibly large header")
  702. }
  703. totSize := 2 + hdrSize + 4 + size
  704. buf := BufferPool.Get(totSize)
  705. // Header length
  706. binary.BigEndian.PutUint16(buf, uint16(hdrSize))
  707. // Header
  708. if _, err := hdr.MarshalTo(buf[2:]); err != nil {
  709. BufferPool.Put(buf)
  710. return errors.Wrap(err, "marshalling header")
  711. }
  712. // Message length
  713. binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(size))
  714. // Message
  715. if _, err := msg.MarshalTo(buf[2+hdrSize+4:]); err != nil {
  716. BufferPool.Put(buf)
  717. return errors.Wrap(err, "marshalling message")
  718. }
  719. n, err := c.cw.Write(buf[:totSize])
  720. BufferPool.Put(buf)
  721. l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message), err=%v", n, hdrSize, size, err)
  722. if err != nil {
  723. return errors.Wrap(err, "writing message")
  724. }
  725. return nil
  726. }
  727. func (c *rawConnection) typeOf(msg message) MessageType {
  728. switch msg.(type) {
  729. case *ClusterConfig:
  730. return MessageTypeClusterConfig
  731. case *Index:
  732. return MessageTypeIndex
  733. case *IndexUpdate:
  734. return MessageTypeIndexUpdate
  735. case *Request:
  736. return MessageTypeRequest
  737. case *Response:
  738. return MessageTypeResponse
  739. case *DownloadProgress:
  740. return MessageTypeDownloadProgress
  741. case *Ping:
  742. return MessageTypePing
  743. case *Close:
  744. return MessageTypeClose
  745. default:
  746. panic("bug: unknown message type")
  747. }
  748. }
  749. func (c *rawConnection) newMessage(t MessageType) (message, error) {
  750. switch t {
  751. case MessageTypeClusterConfig:
  752. return new(ClusterConfig), nil
  753. case MessageTypeIndex:
  754. return new(Index), nil
  755. case MessageTypeIndexUpdate:
  756. return new(IndexUpdate), nil
  757. case MessageTypeRequest:
  758. return new(Request), nil
  759. case MessageTypeResponse:
  760. return new(Response), nil
  761. case MessageTypeDownloadProgress:
  762. return new(DownloadProgress), nil
  763. case MessageTypePing:
  764. return new(Ping), nil
  765. case MessageTypeClose:
  766. return new(Close), nil
  767. default:
  768. return nil, errUnknownMessage
  769. }
  770. }
  771. func (c *rawConnection) shouldCompressMessage(msg message) bool {
  772. switch c.compression {
  773. case CompressionNever:
  774. return false
  775. case CompressionAlways:
  776. // Use compression for large enough messages
  777. return msg.ProtoSize() >= compressionThreshold
  778. case CompressionMetadata:
  779. _, isResponse := msg.(*Response)
  780. // Compress if it's large enough and not a response message
  781. return !isResponse && msg.ProtoSize() >= compressionThreshold
  782. default:
  783. panic("unknown compression setting")
  784. }
  785. }
  786. // Close is called when the connection is regularely closed and thus the Close
  787. // BEP message is sent before terminating the actual connection. The error
  788. // argument specifies the reason for closing the connection.
  789. func (c *rawConnection) Close(err error) {
  790. c.sendCloseOnce.Do(func() {
  791. done := make(chan struct{})
  792. timeout := time.NewTimer(CloseTimeout)
  793. select {
  794. case c.closeBox <- asyncMessage{&Close{err.Error()}, done}:
  795. select {
  796. case <-done:
  797. case <-timeout.C:
  798. case <-c.closed:
  799. }
  800. case <-timeout.C:
  801. case <-c.closed:
  802. }
  803. })
  804. // Close might be called from a method that is called from within
  805. // dispatcherLoop, resulting in a deadlock.
  806. // The sending above must happen before spawning the routine, to prevent
  807. // the underlying connection from terminating before sending the close msg.
  808. go c.internalClose(err)
  809. }
  810. // internalClose is called if there is an unexpected error during normal operation.
  811. func (c *rawConnection) internalClose(err error) {
  812. c.closeOnce.Do(func() {
  813. l.Debugln("close due to", err)
  814. if cerr := c.closer.Close(); cerr != nil {
  815. l.Debugln(c.id, "failed to close underlying conn:", cerr)
  816. }
  817. close(c.closed)
  818. c.awaitingMut.Lock()
  819. for i, ch := range c.awaiting {
  820. if ch != nil {
  821. close(ch)
  822. delete(c.awaiting, i)
  823. }
  824. }
  825. c.awaitingMut.Unlock()
  826. <-c.dispatcherLoopStopped
  827. c.receiver.Closed(c, err)
  828. })
  829. }
  830. // The pingSender makes sure that we've sent a message within the last
  831. // PingSendInterval. If we already have something sent in the last
  832. // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
  833. // results in an effecting ping interval of somewhere between
  834. // PingSendInterval/2 and PingSendInterval.
  835. func (c *rawConnection) pingSender() {
  836. ticker := time.NewTicker(PingSendInterval / 2)
  837. defer ticker.Stop()
  838. for {
  839. select {
  840. case <-ticker.C:
  841. d := time.Since(c.cw.Last())
  842. if d < PingSendInterval/2 {
  843. l.Debugln(c.id, "ping skipped after wr", d)
  844. continue
  845. }
  846. l.Debugln(c.id, "ping -> after", d)
  847. c.ping()
  848. case <-c.closed:
  849. return
  850. }
  851. }
  852. }
  853. // The pingReceiver checks that we've received a message (any message will do,
  854. // but we expect pings in the absence of other messages) within the last
  855. // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
  856. func (c *rawConnection) pingReceiver() {
  857. ticker := time.NewTicker(ReceiveTimeout / 2)
  858. defer ticker.Stop()
  859. for {
  860. select {
  861. case <-ticker.C:
  862. d := time.Since(c.cr.Last())
  863. if d > ReceiveTimeout {
  864. l.Debugln(c.id, "ping timeout", d)
  865. c.internalClose(ErrTimeout)
  866. }
  867. l.Debugln(c.id, "last read within", d)
  868. case <-c.closed:
  869. return
  870. }
  871. }
  872. }
  873. type Statistics struct {
  874. At time.Time
  875. InBytesTotal int64
  876. OutBytesTotal int64
  877. StartedAt time.Time
  878. }
  879. func (c *rawConnection) Statistics() Statistics {
  880. return Statistics{
  881. At: time.Now().Truncate(time.Second),
  882. InBytesTotal: c.cr.Tot(),
  883. OutBytesTotal: c.cw.Tot(),
  884. StartedAt: c.startTime,
  885. }
  886. }
  887. func (c *rawConnection) lz4Compress(src []byte) ([]byte, error) {
  888. var err error
  889. buf := BufferPool.Get(lz4.CompressBound(len(src)))
  890. compressed, err := lz4.Encode(buf, src)
  891. if err != nil {
  892. BufferPool.Put(buf)
  893. return nil, err
  894. }
  895. if &compressed[0] != &buf[0] {
  896. panic("bug: lz4.Compress allocated, which it must not (should use buffer pool)")
  897. }
  898. binary.BigEndian.PutUint32(compressed, binary.LittleEndian.Uint32(compressed))
  899. return compressed, nil
  900. }
  901. func (c *rawConnection) lz4Decompress(src []byte) ([]byte, error) {
  902. size := binary.BigEndian.Uint32(src)
  903. binary.LittleEndian.PutUint32(src, size)
  904. var err error
  905. buf := BufferPool.Get(int(size))
  906. decoded, err := lz4.Decode(buf, src)
  907. if err != nil {
  908. BufferPool.Put(buf)
  909. return nil, err
  910. }
  911. if &decoded[0] != &buf[0] {
  912. panic("bug: lz4.Decode allocated, which it must not (should use buffer pool)")
  913. }
  914. return decoded, nil
  915. }
  916. func newProtocolError(err error, msgContext string) error {
  917. return fmt.Errorf("protocol error on %v: %w", msgContext, err)
  918. }
  919. func newHandleError(err error, msgContext string) error {
  920. return fmt.Errorf("handling %v: %w", msgContext, err)
  921. }
  922. func messageContext(msg message) (string, error) {
  923. switch msg := msg.(type) {
  924. case *ClusterConfig:
  925. return "cluster-config", nil
  926. case *Index:
  927. return fmt.Sprintf("index for %v", msg.Folder), nil
  928. case *IndexUpdate:
  929. return fmt.Sprintf("index-update for %v", msg.Folder), nil
  930. case *Request:
  931. return fmt.Sprintf(`request for "%v" in %v`, msg.Name, msg.Folder), nil
  932. case *Response:
  933. return "response", nil
  934. case *DownloadProgress:
  935. return fmt.Sprintf("download-progress for %v", msg.Folder), nil
  936. case *Ping:
  937. return "ping", nil
  938. case *Close:
  939. return "close", nil
  940. default:
  941. return "", errors.New("unknown or empty message")
  942. }
  943. }