protocol.go 29 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087
  1. // Copyright (C) 2014 The Protocol Authors.
  2. //go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
  3. // Prevents import loop, for internal testing
  4. //go:generate counterfeiter -o mocked_connection_info_test.go --fake-name mockedConnectionInfo . ConnectionInfo
  5. //go:generate go run ../../script/prune_mocks.go -t mocked_connection_info_test.go
  6. //go:generate counterfeiter -o mocks/connection_info.go --fake-name ConnectionInfo . ConnectionInfo
  7. //go:generate counterfeiter -o mocks/connection.go --fake-name Connection . Connection
  8. package protocol
  9. import (
  10. "context"
  11. "crypto/sha256"
  12. "encoding/binary"
  13. "fmt"
  14. "io"
  15. "net"
  16. "path"
  17. "strings"
  18. "sync"
  19. "time"
  20. lz4 "github.com/bkaradzic/go-lz4"
  21. "github.com/pkg/errors"
  22. )
  23. const (
  24. // Shifts
  25. KiB = 10
  26. MiB = 20
  27. GiB = 30
  28. )
  29. const (
  30. // MaxMessageLen is the largest message size allowed on the wire. (500 MB)
  31. MaxMessageLen = 500 * 1000 * 1000
  32. // MinBlockSize is the minimum block size allowed
  33. MinBlockSize = 128 << KiB
  34. // MaxBlockSize is the maximum block size allowed
  35. MaxBlockSize = 16 << MiB
  36. // DesiredPerFileBlocks is the number of blocks we aim for per file
  37. DesiredPerFileBlocks = 2000
  38. )
  39. // BlockSizes is the list of valid block sizes, from min to max
  40. var BlockSizes []int
  41. // For each block size, the hash of a block of all zeroes
  42. var sha256OfEmptyBlock = map[int][sha256.Size]byte{
  43. 128 << KiB: {0xfa, 0x43, 0x23, 0x9b, 0xce, 0xe7, 0xb9, 0x7c, 0xa6, 0x2f, 0x0, 0x7c, 0xc6, 0x84, 0x87, 0x56, 0xa, 0x39, 0xe1, 0x9f, 0x74, 0xf3, 0xdd, 0xe7, 0x48, 0x6d, 0xb3, 0xf9, 0x8d, 0xf8, 0xe4, 0x71},
  44. 256 << KiB: {0x8a, 0x39, 0xd2, 0xab, 0xd3, 0x99, 0x9a, 0xb7, 0x3c, 0x34, 0xdb, 0x24, 0x76, 0x84, 0x9c, 0xdd, 0xf3, 0x3, 0xce, 0x38, 0x9b, 0x35, 0x82, 0x68, 0x50, 0xf9, 0xa7, 0x0, 0x58, 0x9b, 0x4a, 0x90},
  45. 512 << KiB: {0x7, 0x85, 0x4d, 0x2f, 0xef, 0x29, 0x7a, 0x6, 0xba, 0x81, 0x68, 0x5e, 0x66, 0xc, 0x33, 0x2d, 0xe3, 0x6d, 0x5d, 0x18, 0xd5, 0x46, 0x92, 0x7d, 0x30, 0xda, 0xad, 0x6d, 0x7f, 0xda, 0x15, 0x41},
  46. 1 << MiB: {0x30, 0xe1, 0x49, 0x55, 0xeb, 0xf1, 0x35, 0x22, 0x66, 0xdc, 0x2f, 0xf8, 0x6, 0x7e, 0x68, 0x10, 0x46, 0x7, 0xe7, 0x50, 0xab, 0xb9, 0xd3, 0xb3, 0x65, 0x82, 0xb8, 0xaf, 0x90, 0x9f, 0xcb, 0x58},
  47. 2 << MiB: {0x56, 0x47, 0xf0, 0x5e, 0xc1, 0x89, 0x58, 0x94, 0x7d, 0x32, 0x87, 0x4e, 0xeb, 0x78, 0x8f, 0xa3, 0x96, 0xa0, 0x5d, 0xb, 0xab, 0x7c, 0x1b, 0x71, 0xf1, 0x12, 0xce, 0xb7, 0xe9, 0xb3, 0x1e, 0xee},
  48. 4 << MiB: {0xbb, 0x9f, 0x8d, 0xf6, 0x14, 0x74, 0xd2, 0x5e, 0x71, 0xfa, 0x0, 0x72, 0x23, 0x18, 0xcd, 0x38, 0x73, 0x96, 0xca, 0x17, 0x36, 0x60, 0x5e, 0x12, 0x48, 0x82, 0x1c, 0xc0, 0xde, 0x3d, 0x3a, 0xf8},
  49. 8 << MiB: {0x2d, 0xae, 0xb1, 0xf3, 0x60, 0x95, 0xb4, 0x4b, 0x31, 0x84, 0x10, 0xb3, 0xf4, 0xe8, 0xb5, 0xd9, 0x89, 0xdc, 0xc7, 0xbb, 0x2, 0x3d, 0x14, 0x26, 0xc4, 0x92, 0xda, 0xb0, 0xa3, 0x5, 0x3e, 0x74},
  50. 16 << MiB: {0x8, 0xa, 0xcf, 0x35, 0xa5, 0x7, 0xac, 0x98, 0x49, 0xcf, 0xcb, 0xa4, 0x7d, 0xc2, 0xad, 0x83, 0xe0, 0x1b, 0x75, 0x66, 0x3a, 0x51, 0x62, 0x79, 0xc8, 0xb9, 0xd2, 0x43, 0xb7, 0x19, 0x64, 0x3e},
  51. }
  52. func init() {
  53. for blockSize := MinBlockSize; blockSize <= MaxBlockSize; blockSize *= 2 {
  54. BlockSizes = append(BlockSizes, blockSize)
  55. if _, ok := sha256OfEmptyBlock[blockSize]; !ok {
  56. panic("missing hard coded value for sha256 of empty block")
  57. }
  58. }
  59. BufferPool = newBufferPool()
  60. }
  61. // BlockSize returns the block size to use for the given file size
  62. func BlockSize(fileSize int64) int {
  63. var blockSize int
  64. for _, blockSize = range BlockSizes {
  65. if fileSize < DesiredPerFileBlocks*int64(blockSize) {
  66. break
  67. }
  68. }
  69. return blockSize
  70. }
  71. const (
  72. stateInitial = iota
  73. stateReady
  74. )
  75. // FileInfo.LocalFlags flags
  76. const (
  77. FlagLocalUnsupported = 1 << 0 // The kind is unsupported, e.g. symlinks on Windows
  78. FlagLocalIgnored = 1 << 1 // Matches local ignore patterns
  79. FlagLocalMustRescan = 1 << 2 // Doesn't match content on disk, must be rechecked fully
  80. FlagLocalReceiveOnly = 1 << 3 // Change detected on receive only folder
  81. // Flags that should result in the Invalid bit on outgoing updates
  82. LocalInvalidFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalMustRescan | FlagLocalReceiveOnly
  83. // Flags that should result in a file being in conflict with its
  84. // successor, due to us not having an up to date picture of its state on
  85. // disk.
  86. LocalConflictFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalReceiveOnly
  87. LocalAllFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalMustRescan | FlagLocalReceiveOnly
  88. )
  89. var (
  90. ErrClosed = errors.New("connection closed")
  91. ErrTimeout = errors.New("read timeout")
  92. errUnknownMessage = errors.New("unknown message")
  93. errInvalidFilename = errors.New("filename is invalid")
  94. errUncleanFilename = errors.New("filename not in canonical format")
  95. errDeletedHasBlocks = errors.New("deleted file with non-empty block list")
  96. errDirectoryHasBlocks = errors.New("directory with non-empty block list")
  97. errFileHasNoBlocks = errors.New("file with empty block list")
  98. )
  99. type Model interface {
  100. // An index was received from the peer device
  101. Index(deviceID DeviceID, folder string, files []FileInfo) error
  102. // An index update was received from the peer device
  103. IndexUpdate(deviceID DeviceID, folder string, files []FileInfo) error
  104. // A request was made by the peer device
  105. Request(deviceID DeviceID, folder, name string, blockNo, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error)
  106. // A cluster configuration message was received
  107. ClusterConfig(deviceID DeviceID, config ClusterConfig) error
  108. // The peer device closed the connection or an error occurred
  109. Closed(device DeviceID, err error)
  110. // The peer device sent progress updates for the files it is currently downloading
  111. DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate) error
  112. }
  113. type RequestResponse interface {
  114. Data() []byte
  115. Close() // Must always be called once the byte slice is no longer in use
  116. Wait() // Blocks until Close is called
  117. }
  118. type Connection interface {
  119. Start()
  120. SetFolderPasswords(passwords map[string]string)
  121. Close(err error)
  122. ID() DeviceID
  123. Index(ctx context.Context, folder string, files []FileInfo) error
  124. IndexUpdate(ctx context.Context, folder string, files []FileInfo) error
  125. Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
  126. ClusterConfig(config ClusterConfig)
  127. DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate)
  128. Statistics() Statistics
  129. Closed() bool
  130. ConnectionInfo
  131. }
  132. type ConnectionInfo interface {
  133. Type() string
  134. Transport() string
  135. RemoteAddr() net.Addr
  136. Priority() int
  137. String() string
  138. Crypto() string
  139. EstablishedAt() time.Time
  140. }
  141. type rawConnection struct {
  142. ConnectionInfo
  143. id DeviceID
  144. receiver Model
  145. startTime time.Time
  146. cr *countingReader
  147. cw *countingWriter
  148. closer io.Closer // Closing the underlying connection and thus cr and cw
  149. awaiting map[int]chan asyncResult
  150. awaitingMut sync.Mutex
  151. idxMut sync.Mutex // ensures serialization of Index calls
  152. nextID int
  153. nextIDMut sync.Mutex
  154. inbox chan message
  155. outbox chan asyncMessage
  156. closeBox chan asyncMessage
  157. clusterConfigBox chan *ClusterConfig
  158. dispatcherLoopStopped chan struct{}
  159. closed chan struct{}
  160. closeOnce sync.Once
  161. sendCloseOnce sync.Once
  162. compression Compression
  163. loopWG sync.WaitGroup // Need to ensure no leftover routines in testing
  164. }
  165. type asyncResult struct {
  166. val []byte
  167. err error
  168. }
  169. type message interface {
  170. ProtoSize() int
  171. Marshal() ([]byte, error)
  172. MarshalTo([]byte) (int, error)
  173. Unmarshal([]byte) error
  174. }
  175. type asyncMessage struct {
  176. msg message
  177. done chan struct{} // done closes when we're done sending the message
  178. }
  179. const (
  180. // PingSendInterval is how often we make sure to send a message, by
  181. // triggering pings if necessary.
  182. PingSendInterval = 90 * time.Second
  183. // ReceiveTimeout is the longest we'll wait for a message from the other
  184. // side before closing the connection.
  185. ReceiveTimeout = 300 * time.Second
  186. )
  187. // CloseTimeout is the longest we'll wait when trying to send the close
  188. // message before just closing the connection.
  189. // Should not be modified in production code, just for testing.
  190. var CloseTimeout = 10 * time.Second
  191. func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver Model, connInfo ConnectionInfo, compress Compression, passwords map[string]string) Connection {
  192. // Encryption / decryption is first (outermost) before conversion to
  193. // native path formats.
  194. nm := nativeModel{receiver}
  195. em := &encryptedModel{model: nm, folderKeys: newFolderKeyRegistry(passwords)}
  196. // We do the wire format conversion first (outermost) so that the
  197. // metadata is in wire format when it reaches the encryption step.
  198. rc := newRawConnection(deviceID, reader, writer, closer, em, connInfo, compress)
  199. ec := encryptedConnection{ConnectionInfo: rc, conn: rc, folderKeys: em.folderKeys}
  200. wc := wireFormatConnection{ec}
  201. return wc
  202. }
  203. func newRawConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, closer io.Closer, receiver Model, connInfo ConnectionInfo, compress Compression) *rawConnection {
  204. cr := &countingReader{Reader: reader}
  205. cw := &countingWriter{Writer: writer}
  206. return &rawConnection{
  207. ConnectionInfo: connInfo,
  208. id: deviceID,
  209. receiver: receiver,
  210. cr: cr,
  211. cw: cw,
  212. closer: closer,
  213. awaiting: make(map[int]chan asyncResult),
  214. inbox: make(chan message),
  215. outbox: make(chan asyncMessage),
  216. closeBox: make(chan asyncMessage),
  217. clusterConfigBox: make(chan *ClusterConfig),
  218. dispatcherLoopStopped: make(chan struct{}),
  219. closed: make(chan struct{}),
  220. compression: compress,
  221. loopWG: sync.WaitGroup{},
  222. }
  223. }
  224. // Start creates the goroutines for sending and receiving of messages. It must
  225. // be called exactly once after creating a connection.
  226. func (c *rawConnection) Start() {
  227. c.loopWG.Add(5)
  228. go func() {
  229. c.readerLoop()
  230. c.loopWG.Done()
  231. }()
  232. go func() {
  233. err := c.dispatcherLoop()
  234. c.Close(err)
  235. c.loopWG.Done()
  236. }()
  237. go func() {
  238. c.writerLoop()
  239. c.loopWG.Done()
  240. }()
  241. go func() {
  242. c.pingSender()
  243. c.loopWG.Done()
  244. }()
  245. go func() {
  246. c.pingReceiver()
  247. c.loopWG.Done()
  248. }()
  249. c.startTime = time.Now().Truncate(time.Second)
  250. }
  251. func (c *rawConnection) ID() DeviceID {
  252. return c.id
  253. }
  254. // Index writes the list of file information to the connected peer device
  255. func (c *rawConnection) Index(ctx context.Context, folder string, idx []FileInfo) error {
  256. select {
  257. case <-c.closed:
  258. return ErrClosed
  259. default:
  260. }
  261. c.idxMut.Lock()
  262. c.send(ctx, &Index{
  263. Folder: folder,
  264. Files: idx,
  265. }, nil)
  266. c.idxMut.Unlock()
  267. return nil
  268. }
  269. // IndexUpdate writes the list of file information to the connected peer device as an update
  270. func (c *rawConnection) IndexUpdate(ctx context.Context, folder string, idx []FileInfo) error {
  271. select {
  272. case <-c.closed:
  273. return ErrClosed
  274. default:
  275. }
  276. c.idxMut.Lock()
  277. c.send(ctx, &IndexUpdate{
  278. Folder: folder,
  279. Files: idx,
  280. }, nil)
  281. c.idxMut.Unlock()
  282. return nil
  283. }
  284. // Request returns the bytes for the specified block after fetching them from the connected peer.
  285. func (c *rawConnection) Request(ctx context.Context, folder string, name string, blockNo int, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
  286. c.nextIDMut.Lock()
  287. id := c.nextID
  288. c.nextID++
  289. c.nextIDMut.Unlock()
  290. c.awaitingMut.Lock()
  291. if _, ok := c.awaiting[id]; ok {
  292. c.awaitingMut.Unlock()
  293. panic("id taken")
  294. }
  295. rc := make(chan asyncResult, 1)
  296. c.awaiting[id] = rc
  297. c.awaitingMut.Unlock()
  298. ok := c.send(ctx, &Request{
  299. ID: id,
  300. Folder: folder,
  301. Name: name,
  302. Offset: offset,
  303. Size: size,
  304. BlockNo: blockNo,
  305. Hash: hash,
  306. WeakHash: weakHash,
  307. FromTemporary: fromTemporary,
  308. }, nil)
  309. if !ok {
  310. return nil, ErrClosed
  311. }
  312. select {
  313. case res, ok := <-rc:
  314. if !ok {
  315. return nil, ErrClosed
  316. }
  317. return res.val, res.err
  318. case <-ctx.Done():
  319. return nil, ctx.Err()
  320. }
  321. }
  322. // ClusterConfig sends the cluster configuration message to the peer.
  323. func (c *rawConnection) ClusterConfig(config ClusterConfig) {
  324. select {
  325. case c.clusterConfigBox <- &config:
  326. case <-c.closed:
  327. }
  328. }
  329. func (c *rawConnection) Closed() bool {
  330. select {
  331. case <-c.closed:
  332. return true
  333. default:
  334. return false
  335. }
  336. }
  337. // DownloadProgress sends the progress updates for the files that are currently being downloaded.
  338. func (c *rawConnection) DownloadProgress(ctx context.Context, folder string, updates []FileDownloadProgressUpdate) {
  339. c.send(ctx, &DownloadProgress{
  340. Folder: folder,
  341. Updates: updates,
  342. }, nil)
  343. }
  344. func (c *rawConnection) ping() bool {
  345. return c.send(context.Background(), &Ping{}, nil)
  346. }
  347. func (c *rawConnection) readerLoop() {
  348. fourByteBuf := make([]byte, 4)
  349. for {
  350. msg, err := c.readMessage(fourByteBuf)
  351. if err != nil {
  352. if err == errUnknownMessage {
  353. // Unknown message types are skipped, for future extensibility.
  354. continue
  355. }
  356. c.internalClose(err)
  357. return
  358. }
  359. select {
  360. case c.inbox <- msg:
  361. case <-c.closed:
  362. return
  363. }
  364. }
  365. }
  366. func (c *rawConnection) dispatcherLoop() (err error) {
  367. defer close(c.dispatcherLoopStopped)
  368. var msg message
  369. state := stateInitial
  370. for {
  371. select {
  372. case msg = <-c.inbox:
  373. case <-c.closed:
  374. return ErrClosed
  375. }
  376. msgContext, err := messageContext(msg)
  377. if err != nil {
  378. return fmt.Errorf("protocol error: %w", err)
  379. }
  380. l.Debugf("handle %v message", msgContext)
  381. switch msg := msg.(type) {
  382. case *ClusterConfig:
  383. if state == stateInitial {
  384. state = stateReady
  385. }
  386. case *Close:
  387. return fmt.Errorf("closed by remote: %v", msg.Reason)
  388. default:
  389. if state != stateReady {
  390. return newProtocolError(fmt.Errorf("invalid state %d", state), msgContext)
  391. }
  392. }
  393. switch msg := msg.(type) {
  394. case *Index:
  395. err = checkIndexConsistency(msg.Files)
  396. case *IndexUpdate:
  397. err = checkIndexConsistency(msg.Files)
  398. case *Request:
  399. err = checkFilename(msg.Name)
  400. }
  401. if err != nil {
  402. return newProtocolError(err, msgContext)
  403. }
  404. switch msg := msg.(type) {
  405. case *ClusterConfig:
  406. err = c.receiver.ClusterConfig(c.id, *msg)
  407. case *Index:
  408. err = c.handleIndex(*msg)
  409. case *IndexUpdate:
  410. err = c.handleIndexUpdate(*msg)
  411. case *Request:
  412. go c.handleRequest(*msg)
  413. case *Response:
  414. c.handleResponse(*msg)
  415. case *DownloadProgress:
  416. err = c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates)
  417. }
  418. if err != nil {
  419. return newHandleError(err, msgContext)
  420. }
  421. }
  422. }
  423. func (c *rawConnection) readMessage(fourByteBuf []byte) (message, error) {
  424. hdr, err := c.readHeader(fourByteBuf)
  425. if err != nil {
  426. return nil, err
  427. }
  428. return c.readMessageAfterHeader(hdr, fourByteBuf)
  429. }
  430. func (c *rawConnection) readMessageAfterHeader(hdr Header, fourByteBuf []byte) (message, error) {
  431. // First comes a 4 byte message length
  432. if _, err := io.ReadFull(c.cr, fourByteBuf[:4]); err != nil {
  433. return nil, errors.Wrap(err, "reading message length")
  434. }
  435. msgLen := int32(binary.BigEndian.Uint32(fourByteBuf))
  436. if msgLen < 0 {
  437. return nil, fmt.Errorf("negative message length %d", msgLen)
  438. } else if msgLen > MaxMessageLen {
  439. return nil, fmt.Errorf("message length %d exceeds maximum %d", msgLen, MaxMessageLen)
  440. }
  441. // Then comes the message
  442. buf := BufferPool.Get(int(msgLen))
  443. if _, err := io.ReadFull(c.cr, buf); err != nil {
  444. BufferPool.Put(buf)
  445. return nil, errors.Wrap(err, "reading message")
  446. }
  447. // ... which might be compressed
  448. switch hdr.Compression {
  449. case MessageCompressionNone:
  450. // Nothing
  451. case MessageCompressionLZ4:
  452. decomp, err := c.lz4Decompress(buf)
  453. BufferPool.Put(buf)
  454. if err != nil {
  455. return nil, errors.Wrap(err, "decompressing message")
  456. }
  457. buf = decomp
  458. default:
  459. return nil, fmt.Errorf("unknown message compression %d", hdr.Compression)
  460. }
  461. // ... and is then unmarshalled
  462. msg, err := c.newMessage(hdr.Type)
  463. if err != nil {
  464. BufferPool.Put(buf)
  465. return nil, err
  466. }
  467. if err := msg.Unmarshal(buf); err != nil {
  468. BufferPool.Put(buf)
  469. return nil, errors.Wrap(err, "unmarshalling message")
  470. }
  471. BufferPool.Put(buf)
  472. return msg, nil
  473. }
  474. func (c *rawConnection) readHeader(fourByteBuf []byte) (Header, error) {
  475. // First comes a 2 byte header length
  476. if _, err := io.ReadFull(c.cr, fourByteBuf[:2]); err != nil {
  477. return Header{}, errors.Wrap(err, "reading length")
  478. }
  479. hdrLen := int16(binary.BigEndian.Uint16(fourByteBuf))
  480. if hdrLen < 0 {
  481. return Header{}, fmt.Errorf("negative header length %d", hdrLen)
  482. }
  483. // Then comes the header
  484. buf := BufferPool.Get(int(hdrLen))
  485. if _, err := io.ReadFull(c.cr, buf); err != nil {
  486. BufferPool.Put(buf)
  487. return Header{}, errors.Wrap(err, "reading header")
  488. }
  489. var hdr Header
  490. err := hdr.Unmarshal(buf)
  491. BufferPool.Put(buf)
  492. if err != nil {
  493. return Header{}, errors.Wrap(err, "unmarshalling header")
  494. }
  495. return hdr, nil
  496. }
  497. func (c *rawConnection) handleIndex(im Index) error {
  498. l.Debugf("Index(%v, %v, %d file)", c.id, im.Folder, len(im.Files))
  499. return c.receiver.Index(c.id, im.Folder, im.Files)
  500. }
  501. func (c *rawConnection) handleIndexUpdate(im IndexUpdate) error {
  502. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
  503. return c.receiver.IndexUpdate(c.id, im.Folder, im.Files)
  504. }
  505. // checkIndexConsistency verifies a number of invariants on FileInfos received in
  506. // index messages.
  507. func checkIndexConsistency(fs []FileInfo) error {
  508. for _, f := range fs {
  509. if err := checkFileInfoConsistency(f); err != nil {
  510. return errors.Wrapf(err, "%q", f.Name)
  511. }
  512. }
  513. return nil
  514. }
  515. // checkFileInfoConsistency verifies a number of invariants on the given FileInfo
  516. func checkFileInfoConsistency(f FileInfo) error {
  517. if err := checkFilename(f.Name); err != nil {
  518. return err
  519. }
  520. switch {
  521. case f.Deleted && len(f.Blocks) != 0:
  522. // Deleted files should have no blocks
  523. return errDeletedHasBlocks
  524. case f.Type == FileInfoTypeDirectory && len(f.Blocks) != 0:
  525. // Directories should have no blocks
  526. return errDirectoryHasBlocks
  527. case !f.Deleted && !f.IsInvalid() && f.Type == FileInfoTypeFile && len(f.Blocks) == 0:
  528. // Non-deleted, non-invalid files should have at least one block
  529. return errFileHasNoBlocks
  530. }
  531. return nil
  532. }
  533. // checkFilename verifies that the given filename is valid according to the
  534. // spec on what's allowed over the wire. A filename failing this test is
  535. // grounds for disconnecting the device.
  536. func checkFilename(name string) error {
  537. cleanedName := path.Clean(name)
  538. if cleanedName != name {
  539. // The filename on the wire should be in canonical format. If
  540. // Clean() managed to clean it up, there was something wrong with
  541. // it.
  542. return errUncleanFilename
  543. }
  544. switch name {
  545. case "", ".", "..":
  546. // These names are always invalid.
  547. return errInvalidFilename
  548. }
  549. if strings.HasPrefix(name, "/") {
  550. // Names are folder relative, not absolute.
  551. return errInvalidFilename
  552. }
  553. if strings.HasPrefix(name, "../") {
  554. // Starting with a dotdot is not allowed. Any other dotdots would
  555. // have been handled by the Clean() call at the top.
  556. return errInvalidFilename
  557. }
  558. return nil
  559. }
  560. func (c *rawConnection) handleRequest(req Request) {
  561. res, err := c.receiver.Request(c.id, req.Folder, req.Name, int32(req.BlockNo), int32(req.Size), req.Offset, req.Hash, req.WeakHash, req.FromTemporary)
  562. if err != nil {
  563. c.send(context.Background(), &Response{
  564. ID: req.ID,
  565. Code: errorToCode(err),
  566. }, nil)
  567. return
  568. }
  569. done := make(chan struct{})
  570. c.send(context.Background(), &Response{
  571. ID: req.ID,
  572. Data: res.Data(),
  573. Code: errorToCode(nil),
  574. }, done)
  575. <-done
  576. res.Close()
  577. }
  578. func (c *rawConnection) handleResponse(resp Response) {
  579. c.awaitingMut.Lock()
  580. if rc := c.awaiting[resp.ID]; rc != nil {
  581. delete(c.awaiting, resp.ID)
  582. rc <- asyncResult{resp.Data, codeToError(resp.Code)}
  583. close(rc)
  584. }
  585. c.awaitingMut.Unlock()
  586. }
  587. func (c *rawConnection) send(ctx context.Context, msg message, done chan struct{}) bool {
  588. select {
  589. case c.outbox <- asyncMessage{msg, done}:
  590. return true
  591. case <-c.closed:
  592. case <-ctx.Done():
  593. }
  594. if done != nil {
  595. close(done)
  596. }
  597. return false
  598. }
  599. func (c *rawConnection) writerLoop() {
  600. select {
  601. case cc := <-c.clusterConfigBox:
  602. err := c.writeMessage(cc)
  603. if err != nil {
  604. c.internalClose(err)
  605. return
  606. }
  607. case hm := <-c.closeBox:
  608. _ = c.writeMessage(hm.msg)
  609. close(hm.done)
  610. return
  611. case <-c.closed:
  612. return
  613. }
  614. for {
  615. select {
  616. case cc := <-c.clusterConfigBox:
  617. err := c.writeMessage(cc)
  618. if err != nil {
  619. c.internalClose(err)
  620. return
  621. }
  622. case hm := <-c.outbox:
  623. err := c.writeMessage(hm.msg)
  624. if hm.done != nil {
  625. close(hm.done)
  626. }
  627. if err != nil {
  628. c.internalClose(err)
  629. return
  630. }
  631. case hm := <-c.closeBox:
  632. _ = c.writeMessage(hm.msg)
  633. close(hm.done)
  634. return
  635. case <-c.closed:
  636. return
  637. }
  638. }
  639. }
  640. func (c *rawConnection) writeMessage(msg message) error {
  641. msgContext, _ := messageContext(msg)
  642. l.Debugf("Writing %v", msgContext)
  643. if c.shouldCompressMessage(msg) {
  644. return c.writeCompressedMessage(msg)
  645. }
  646. return c.writeUncompressedMessage(msg)
  647. }
  648. func (c *rawConnection) writeCompressedMessage(msg message) error {
  649. size := msg.ProtoSize()
  650. buf := BufferPool.Get(size)
  651. if _, err := msg.MarshalTo(buf); err != nil {
  652. BufferPool.Put(buf)
  653. return errors.Wrap(err, "marshalling message")
  654. }
  655. compressed, err := c.lz4Compress(buf)
  656. if err != nil {
  657. BufferPool.Put(buf)
  658. return errors.Wrap(err, "compressing message")
  659. }
  660. hdr := Header{
  661. Type: c.typeOf(msg),
  662. Compression: MessageCompressionLZ4,
  663. }
  664. hdrSize := hdr.ProtoSize()
  665. if hdrSize > 1<<16-1 {
  666. panic("impossibly large header")
  667. }
  668. compressedSize := len(compressed)
  669. totSize := 2 + hdrSize + 4 + compressedSize
  670. buf = BufferPool.Upgrade(buf, totSize)
  671. // Header length
  672. binary.BigEndian.PutUint16(buf, uint16(hdrSize))
  673. // Header
  674. if _, err := hdr.MarshalTo(buf[2:]); err != nil {
  675. BufferPool.Put(buf)
  676. BufferPool.Put(compressed)
  677. return errors.Wrap(err, "marshalling header")
  678. }
  679. // Message length
  680. binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(compressedSize))
  681. // Message
  682. copy(buf[2+hdrSize+4:], compressed)
  683. BufferPool.Put(compressed)
  684. n, err := c.cw.Write(buf)
  685. BufferPool.Put(buf)
  686. l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message (%d uncompressed)), err=%v", n, hdrSize, compressedSize, size, err)
  687. if err != nil {
  688. return errors.Wrap(err, "writing message")
  689. }
  690. return nil
  691. }
  692. func (c *rawConnection) writeUncompressedMessage(msg message) error {
  693. size := msg.ProtoSize()
  694. hdr := Header{
  695. Type: c.typeOf(msg),
  696. }
  697. hdrSize := hdr.ProtoSize()
  698. if hdrSize > 1<<16-1 {
  699. panic("impossibly large header")
  700. }
  701. totSize := 2 + hdrSize + 4 + size
  702. buf := BufferPool.Get(totSize)
  703. // Header length
  704. binary.BigEndian.PutUint16(buf, uint16(hdrSize))
  705. // Header
  706. if _, err := hdr.MarshalTo(buf[2:]); err != nil {
  707. BufferPool.Put(buf)
  708. return errors.Wrap(err, "marshalling header")
  709. }
  710. // Message length
  711. binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(size))
  712. // Message
  713. if _, err := msg.MarshalTo(buf[2+hdrSize+4:]); err != nil {
  714. BufferPool.Put(buf)
  715. return errors.Wrap(err, "marshalling message")
  716. }
  717. n, err := c.cw.Write(buf[:totSize])
  718. BufferPool.Put(buf)
  719. l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message), err=%v", n, hdrSize, size, err)
  720. if err != nil {
  721. return errors.Wrap(err, "writing message")
  722. }
  723. return nil
  724. }
  725. func (c *rawConnection) typeOf(msg message) MessageType {
  726. switch msg.(type) {
  727. case *ClusterConfig:
  728. return MessageTypeClusterConfig
  729. case *Index:
  730. return MessageTypeIndex
  731. case *IndexUpdate:
  732. return MessageTypeIndexUpdate
  733. case *Request:
  734. return MessageTypeRequest
  735. case *Response:
  736. return MessageTypeResponse
  737. case *DownloadProgress:
  738. return MessageTypeDownloadProgress
  739. case *Ping:
  740. return MessageTypePing
  741. case *Close:
  742. return MessageTypeClose
  743. default:
  744. panic("bug: unknown message type")
  745. }
  746. }
  747. func (c *rawConnection) newMessage(t MessageType) (message, error) {
  748. switch t {
  749. case MessageTypeClusterConfig:
  750. return new(ClusterConfig), nil
  751. case MessageTypeIndex:
  752. return new(Index), nil
  753. case MessageTypeIndexUpdate:
  754. return new(IndexUpdate), nil
  755. case MessageTypeRequest:
  756. return new(Request), nil
  757. case MessageTypeResponse:
  758. return new(Response), nil
  759. case MessageTypeDownloadProgress:
  760. return new(DownloadProgress), nil
  761. case MessageTypePing:
  762. return new(Ping), nil
  763. case MessageTypeClose:
  764. return new(Close), nil
  765. default:
  766. return nil, errUnknownMessage
  767. }
  768. }
  769. func (c *rawConnection) shouldCompressMessage(msg message) bool {
  770. switch c.compression {
  771. case CompressionNever:
  772. return false
  773. case CompressionAlways:
  774. // Use compression for large enough messages
  775. return msg.ProtoSize() >= compressionThreshold
  776. case CompressionMetadata:
  777. _, isResponse := msg.(*Response)
  778. // Compress if it's large enough and not a response message
  779. return !isResponse && msg.ProtoSize() >= compressionThreshold
  780. default:
  781. panic("unknown compression setting")
  782. }
  783. }
  784. // Close is called when the connection is regularely closed and thus the Close
  785. // BEP message is sent before terminating the actual connection. The error
  786. // argument specifies the reason for closing the connection.
  787. func (c *rawConnection) Close(err error) {
  788. c.sendCloseOnce.Do(func() {
  789. done := make(chan struct{})
  790. timeout := time.NewTimer(CloseTimeout)
  791. select {
  792. case c.closeBox <- asyncMessage{&Close{err.Error()}, done}:
  793. select {
  794. case <-done:
  795. case <-timeout.C:
  796. case <-c.closed:
  797. }
  798. case <-timeout.C:
  799. case <-c.closed:
  800. }
  801. })
  802. // Close might be called from a method that is called from within
  803. // dispatcherLoop, resulting in a deadlock.
  804. // The sending above must happen before spawning the routine, to prevent
  805. // the underlying connection from terminating before sending the close msg.
  806. go c.internalClose(err)
  807. }
  808. // internalClose is called if there is an unexpected error during normal operation.
  809. func (c *rawConnection) internalClose(err error) {
  810. c.closeOnce.Do(func() {
  811. l.Debugln("close due to", err)
  812. if cerr := c.closer.Close(); cerr != nil {
  813. l.Debugln(c.id, "failed to close underlying conn:", cerr)
  814. }
  815. close(c.closed)
  816. c.awaitingMut.Lock()
  817. for i, ch := range c.awaiting {
  818. if ch != nil {
  819. close(ch)
  820. delete(c.awaiting, i)
  821. }
  822. }
  823. c.awaitingMut.Unlock()
  824. <-c.dispatcherLoopStopped
  825. c.receiver.Closed(c.ID(), err)
  826. })
  827. }
  828. // The pingSender makes sure that we've sent a message within the last
  829. // PingSendInterval. If we already have something sent in the last
  830. // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
  831. // results in an effecting ping interval of somewhere between
  832. // PingSendInterval/2 and PingSendInterval.
  833. func (c *rawConnection) pingSender() {
  834. ticker := time.NewTicker(PingSendInterval / 2)
  835. defer ticker.Stop()
  836. for {
  837. select {
  838. case <-ticker.C:
  839. d := time.Since(c.cw.Last())
  840. if d < PingSendInterval/2 {
  841. l.Debugln(c.id, "ping skipped after wr", d)
  842. continue
  843. }
  844. l.Debugln(c.id, "ping -> after", d)
  845. c.ping()
  846. case <-c.closed:
  847. return
  848. }
  849. }
  850. }
  851. // The pingReceiver checks that we've received a message (any message will do,
  852. // but we expect pings in the absence of other messages) within the last
  853. // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
  854. func (c *rawConnection) pingReceiver() {
  855. ticker := time.NewTicker(ReceiveTimeout / 2)
  856. defer ticker.Stop()
  857. for {
  858. select {
  859. case <-ticker.C:
  860. d := time.Since(c.cr.Last())
  861. if d > ReceiveTimeout {
  862. l.Debugln(c.id, "ping timeout", d)
  863. c.internalClose(ErrTimeout)
  864. }
  865. l.Debugln(c.id, "last read within", d)
  866. case <-c.closed:
  867. return
  868. }
  869. }
  870. }
  871. type Statistics struct {
  872. At time.Time
  873. InBytesTotal int64
  874. OutBytesTotal int64
  875. StartedAt time.Time
  876. }
  877. func (c *rawConnection) Statistics() Statistics {
  878. return Statistics{
  879. At: time.Now().Truncate(time.Second),
  880. InBytesTotal: c.cr.Tot(),
  881. OutBytesTotal: c.cw.Tot(),
  882. StartedAt: c.startTime,
  883. }
  884. }
  885. func (c *rawConnection) lz4Compress(src []byte) ([]byte, error) {
  886. var err error
  887. buf := BufferPool.Get(lz4.CompressBound(len(src)))
  888. compressed, err := lz4.Encode(buf, src)
  889. if err != nil {
  890. BufferPool.Put(buf)
  891. return nil, err
  892. }
  893. if &compressed[0] != &buf[0] {
  894. panic("bug: lz4.Compress allocated, which it must not (should use buffer pool)")
  895. }
  896. binary.BigEndian.PutUint32(compressed, binary.LittleEndian.Uint32(compressed))
  897. return compressed, nil
  898. }
  899. func (c *rawConnection) lz4Decompress(src []byte) ([]byte, error) {
  900. size := binary.BigEndian.Uint32(src)
  901. binary.LittleEndian.PutUint32(src, size)
  902. var err error
  903. buf := BufferPool.Get(int(size))
  904. decoded, err := lz4.Decode(buf, src)
  905. if err != nil {
  906. BufferPool.Put(buf)
  907. return nil, err
  908. }
  909. if &decoded[0] != &buf[0] {
  910. panic("bug: lz4.Decode allocated, which it must not (should use buffer pool)")
  911. }
  912. return decoded, nil
  913. }
  914. func newProtocolError(err error, msgContext string) error {
  915. return fmt.Errorf("protocol error on %v: %w", msgContext, err)
  916. }
  917. func newHandleError(err error, msgContext string) error {
  918. return fmt.Errorf("handling %v: %w", msgContext, err)
  919. }
  920. func messageContext(msg message) (string, error) {
  921. switch msg := msg.(type) {
  922. case *ClusterConfig:
  923. return "cluster-config", nil
  924. case *Index:
  925. return fmt.Sprintf("index for %v", msg.Folder), nil
  926. case *IndexUpdate:
  927. return fmt.Sprintf("index-update for %v", msg.Folder), nil
  928. case *Request:
  929. return fmt.Sprintf(`request for "%v" in %v`, msg.Name, msg.Folder), nil
  930. case *Response:
  931. return "response", nil
  932. case *DownloadProgress:
  933. return fmt.Sprintf("download-progress for %v", msg.Folder), nil
  934. case *Ping:
  935. return "ping", nil
  936. case *Close:
  937. return "close", nil
  938. default:
  939. return "", errors.New("unknown or empty message")
  940. }
  941. }