protocol.go 27 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003
  1. // Copyright (C) 2014 The Protocol Authors.
  2. package protocol
  3. import (
  4. "crypto/sha256"
  5. "encoding/binary"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "path"
  10. "strings"
  11. "sync"
  12. "time"
  13. lz4 "github.com/bkaradzic/go-lz4"
  14. )
  15. const (
  16. // Shifts
  17. KiB = 10
  18. MiB = 20
  19. GiB = 30
  20. )
  21. const (
  22. // MaxMessageLen is the largest message size allowed on the wire. (500 MB)
  23. MaxMessageLen = 500 * 1000 * 1000
  24. // MinBlockSize is the minimum block size allowed
  25. MinBlockSize = 128 << KiB
  26. // MaxBlockSize is the maximum block size allowed
  27. MaxBlockSize = 16 << MiB
  28. // DesiredPerFileBlocks is the number of blocks we aim for per file
  29. DesiredPerFileBlocks = 2000
  30. )
  31. // BlockSizes is the list of valid block sizes, from min to max
  32. var BlockSizes []int
  33. // For each block size, the hash of a block of all zeroes
  34. var sha256OfEmptyBlock = map[int][sha256.Size]byte{
  35. 128 << KiB: {0xfa, 0x43, 0x23, 0x9b, 0xce, 0xe7, 0xb9, 0x7c, 0xa6, 0x2f, 0x0, 0x7c, 0xc6, 0x84, 0x87, 0x56, 0xa, 0x39, 0xe1, 0x9f, 0x74, 0xf3, 0xdd, 0xe7, 0x48, 0x6d, 0xb3, 0xf9, 0x8d, 0xf8, 0xe4, 0x71},
  36. 256 << KiB: {0x8a, 0x39, 0xd2, 0xab, 0xd3, 0x99, 0x9a, 0xb7, 0x3c, 0x34, 0xdb, 0x24, 0x76, 0x84, 0x9c, 0xdd, 0xf3, 0x3, 0xce, 0x38, 0x9b, 0x35, 0x82, 0x68, 0x50, 0xf9, 0xa7, 0x0, 0x58, 0x9b, 0x4a, 0x90},
  37. 512 << KiB: {0x7, 0x85, 0x4d, 0x2f, 0xef, 0x29, 0x7a, 0x6, 0xba, 0x81, 0x68, 0x5e, 0x66, 0xc, 0x33, 0x2d, 0xe3, 0x6d, 0x5d, 0x18, 0xd5, 0x46, 0x92, 0x7d, 0x30, 0xda, 0xad, 0x6d, 0x7f, 0xda, 0x15, 0x41},
  38. 1 << MiB: {0x30, 0xe1, 0x49, 0x55, 0xeb, 0xf1, 0x35, 0x22, 0x66, 0xdc, 0x2f, 0xf8, 0x6, 0x7e, 0x68, 0x10, 0x46, 0x7, 0xe7, 0x50, 0xab, 0xb9, 0xd3, 0xb3, 0x65, 0x82, 0xb8, 0xaf, 0x90, 0x9f, 0xcb, 0x58},
  39. 2 << MiB: {0x56, 0x47, 0xf0, 0x5e, 0xc1, 0x89, 0x58, 0x94, 0x7d, 0x32, 0x87, 0x4e, 0xeb, 0x78, 0x8f, 0xa3, 0x96, 0xa0, 0x5d, 0xb, 0xab, 0x7c, 0x1b, 0x71, 0xf1, 0x12, 0xce, 0xb7, 0xe9, 0xb3, 0x1e, 0xee},
  40. 4 << MiB: {0xbb, 0x9f, 0x8d, 0xf6, 0x14, 0x74, 0xd2, 0x5e, 0x71, 0xfa, 0x0, 0x72, 0x23, 0x18, 0xcd, 0x38, 0x73, 0x96, 0xca, 0x17, 0x36, 0x60, 0x5e, 0x12, 0x48, 0x82, 0x1c, 0xc0, 0xde, 0x3d, 0x3a, 0xf8},
  41. 8 << MiB: {0x2d, 0xae, 0xb1, 0xf3, 0x60, 0x95, 0xb4, 0x4b, 0x31, 0x84, 0x10, 0xb3, 0xf4, 0xe8, 0xb5, 0xd9, 0x89, 0xdc, 0xc7, 0xbb, 0x2, 0x3d, 0x14, 0x26, 0xc4, 0x92, 0xda, 0xb0, 0xa3, 0x5, 0x3e, 0x74},
  42. 16 << MiB: {0x8, 0xa, 0xcf, 0x35, 0xa5, 0x7, 0xac, 0x98, 0x49, 0xcf, 0xcb, 0xa4, 0x7d, 0xc2, 0xad, 0x83, 0xe0, 0x1b, 0x75, 0x66, 0x3a, 0x51, 0x62, 0x79, 0xc8, 0xb9, 0xd2, 0x43, 0xb7, 0x19, 0x64, 0x3e},
  43. }
  44. func init() {
  45. for blockSize := MinBlockSize; blockSize <= MaxBlockSize; blockSize *= 2 {
  46. BlockSizes = append(BlockSizes, blockSize)
  47. if _, ok := sha256OfEmptyBlock[blockSize]; !ok {
  48. panic("missing hard coded value for sha256 of empty block")
  49. }
  50. }
  51. BufferPool = newBufferPool()
  52. }
  53. // BlockSize returns the block size to use for the given file size
  54. func BlockSize(fileSize int64) int {
  55. var blockSize int
  56. for _, blockSize = range BlockSizes {
  57. if fileSize < DesiredPerFileBlocks*int64(blockSize) {
  58. break
  59. }
  60. }
  61. return blockSize
  62. }
  63. const (
  64. stateInitial = iota
  65. stateReady
  66. )
  67. // Request message flags
  68. const (
  69. FlagFromTemporary uint32 = 1 << iota
  70. )
  71. // ClusterConfigMessage.Folders flags
  72. const (
  73. FlagFolderReadOnly uint32 = 1 << 0
  74. FlagFolderIgnorePerms = 1 << 1
  75. FlagFolderIgnoreDelete = 1 << 2
  76. FlagFolderDisabledTempIndexes = 1 << 3
  77. FlagFolderAll = 1<<4 - 1
  78. )
  79. // ClusterConfigMessage.Folders.Devices flags
  80. const (
  81. FlagShareTrusted uint32 = 1 << 0
  82. FlagShareReadOnly = 1 << 1
  83. FlagIntroducer = 1 << 2
  84. FlagShareBits = 0x000000ff
  85. )
  86. // FileInfo.LocalFlags flags
  87. const (
  88. FlagLocalUnsupported = 1 << 0 // The kind is unsupported, e.g. symlinks on Windows
  89. FlagLocalIgnored = 1 << 1 // Matches local ignore patterns
  90. FlagLocalMustRescan = 1 << 2 // Doesn't match content on disk, must be rechecked fully
  91. FlagLocalReceiveOnly = 1 << 3 // Change detected on receive only folder
  92. // Flags that should result in the Invalid bit on outgoing updates
  93. LocalInvalidFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalMustRescan | FlagLocalReceiveOnly
  94. // Flags that should result in a file being in conflict with its
  95. // successor, due to us not having an up to date picture of its state on
  96. // disk.
  97. LocalConflictFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalReceiveOnly
  98. LocalAllFlags = FlagLocalUnsupported | FlagLocalIgnored | FlagLocalMustRescan | FlagLocalReceiveOnly
  99. )
  100. var (
  101. ErrClosed = errors.New("connection closed")
  102. ErrTimeout = errors.New("read timeout")
  103. ErrSwitchingConnections = errors.New("switching connections")
  104. errUnknownMessage = errors.New("unknown message")
  105. errInvalidFilename = errors.New("filename is invalid")
  106. errUncleanFilename = errors.New("filename not in canonical format")
  107. errDeletedHasBlocks = errors.New("deleted file with non-empty block list")
  108. errDirectoryHasBlocks = errors.New("directory with non-empty block list")
  109. errFileHasNoBlocks = errors.New("file with empty block list")
  110. )
  111. type Model interface {
  112. // An index was received from the peer device
  113. Index(deviceID DeviceID, folder string, files []FileInfo)
  114. // An index update was received from the peer device
  115. IndexUpdate(deviceID DeviceID, folder string, files []FileInfo)
  116. // A request was made by the peer device
  117. Request(deviceID DeviceID, folder, name string, size int32, offset int64, hash []byte, weakHash uint32, fromTemporary bool) (RequestResponse, error)
  118. // A cluster configuration message was received
  119. ClusterConfig(deviceID DeviceID, config ClusterConfig)
  120. // The peer device closed the connection
  121. Closed(conn Connection, err error)
  122. // The peer device sent progress updates for the files it is currently downloading
  123. DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate)
  124. }
  125. type RequestResponse interface {
  126. Data() []byte
  127. Close() // Must always be called once the byte slice is no longer in use
  128. Wait() // Blocks until Close is called
  129. }
  130. type Connection interface {
  131. Start()
  132. Close(err error)
  133. ID() DeviceID
  134. Name() string
  135. Index(folder string, files []FileInfo) error
  136. IndexUpdate(folder string, files []FileInfo) error
  137. Request(folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error)
  138. ClusterConfig(config ClusterConfig)
  139. DownloadProgress(folder string, updates []FileDownloadProgressUpdate)
  140. Statistics() Statistics
  141. Closed() bool
  142. }
  143. type rawConnection struct {
  144. id DeviceID
  145. name string
  146. receiver Model
  147. cr *countingReader
  148. cw *countingWriter
  149. awaiting map[int32]chan asyncResult
  150. awaitingMut sync.Mutex
  151. idxMut sync.Mutex // ensures serialization of Index calls
  152. nextID int32
  153. nextIDMut sync.Mutex
  154. inbox chan message
  155. outbox chan asyncMessage
  156. closeBox chan asyncMessage
  157. clusterConfigBox chan *ClusterConfig
  158. dispatcherLoopStopped chan struct{}
  159. preventSends chan struct{}
  160. closed chan struct{}
  161. closeOnce sync.Once
  162. sendCloseOnce sync.Once
  163. compression Compression
  164. }
  165. type asyncResult struct {
  166. val []byte
  167. err error
  168. }
  169. type message interface {
  170. ProtoSize() int
  171. Marshal() ([]byte, error)
  172. MarshalTo([]byte) (int, error)
  173. Unmarshal([]byte) error
  174. }
  175. type asyncMessage struct {
  176. msg message
  177. done chan struct{} // done closes when we're done sending the message
  178. }
  179. const (
  180. // PingSendInterval is how often we make sure to send a message, by
  181. // triggering pings if necessary.
  182. PingSendInterval = 90 * time.Second
  183. // ReceiveTimeout is the longest we'll wait for a message from the other
  184. // side before closing the connection.
  185. ReceiveTimeout = 300 * time.Second
  186. )
  187. // CloseTimeout is the longest we'll wait when trying to send the close
  188. // message before just closing the connection.
  189. // Should not be modified in production code, just for testing.
  190. var CloseTimeout = 10 * time.Second
  191. func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
  192. cr := &countingReader{Reader: reader}
  193. cw := &countingWriter{Writer: writer}
  194. c := rawConnection{
  195. id: deviceID,
  196. name: name,
  197. receiver: nativeModel{receiver},
  198. cr: cr,
  199. cw: cw,
  200. awaiting: make(map[int32]chan asyncResult),
  201. inbox: make(chan message),
  202. outbox: make(chan asyncMessage),
  203. closeBox: make(chan asyncMessage),
  204. clusterConfigBox: make(chan *ClusterConfig),
  205. dispatcherLoopStopped: make(chan struct{}),
  206. preventSends: make(chan struct{}),
  207. closed: make(chan struct{}),
  208. compression: compress,
  209. }
  210. return wireFormatConnection{&c}
  211. }
  212. // Start creates the goroutines for sending and receiving of messages. It must
  213. // be called exactly once after creating a connection.
  214. func (c *rawConnection) Start() {
  215. go c.readerLoop()
  216. go func() {
  217. err := c.dispatcherLoop()
  218. c.internalClose(err)
  219. }()
  220. go c.writerLoop()
  221. go c.pingSender()
  222. go c.pingReceiver()
  223. }
  224. func (c *rawConnection) ID() DeviceID {
  225. return c.id
  226. }
  227. func (c *rawConnection) Name() string {
  228. return c.name
  229. }
  230. // Index writes the list of file information to the connected peer device
  231. func (c *rawConnection) Index(folder string, idx []FileInfo) error {
  232. select {
  233. case <-c.closed:
  234. return ErrClosed
  235. default:
  236. }
  237. c.idxMut.Lock()
  238. c.send(&Index{
  239. Folder: folder,
  240. Files: idx,
  241. }, nil)
  242. c.idxMut.Unlock()
  243. return nil
  244. }
  245. // IndexUpdate writes the list of file information to the connected peer device as an update
  246. func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error {
  247. select {
  248. case <-c.closed:
  249. return ErrClosed
  250. default:
  251. }
  252. c.idxMut.Lock()
  253. c.send(&IndexUpdate{
  254. Folder: folder,
  255. Files: idx,
  256. }, nil)
  257. c.idxMut.Unlock()
  258. return nil
  259. }
  260. // Request returns the bytes for the specified block after fetching them from the connected peer.
  261. func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, weakHash uint32, fromTemporary bool) ([]byte, error) {
  262. c.nextIDMut.Lock()
  263. id := c.nextID
  264. c.nextID++
  265. c.nextIDMut.Unlock()
  266. c.awaitingMut.Lock()
  267. if _, ok := c.awaiting[id]; ok {
  268. panic("id taken")
  269. }
  270. rc := make(chan asyncResult, 1)
  271. c.awaiting[id] = rc
  272. c.awaitingMut.Unlock()
  273. ok := c.send(&Request{
  274. ID: id,
  275. Folder: folder,
  276. Name: name,
  277. Offset: offset,
  278. Size: int32(size),
  279. Hash: hash,
  280. WeakHash: weakHash,
  281. FromTemporary: fromTemporary,
  282. }, nil)
  283. if !ok {
  284. return nil, ErrClosed
  285. }
  286. res, ok := <-rc
  287. if !ok {
  288. return nil, ErrClosed
  289. }
  290. return res.val, res.err
  291. }
  292. // ClusterConfig sends the cluster configuration message to the peer.
  293. // It must be called just once (as per BEP), otherwise it will panic.
  294. func (c *rawConnection) ClusterConfig(config ClusterConfig) {
  295. select {
  296. case c.clusterConfigBox <- &config:
  297. close(c.clusterConfigBox)
  298. case <-c.closed:
  299. }
  300. }
  301. func (c *rawConnection) Closed() bool {
  302. select {
  303. case <-c.closed:
  304. return true
  305. default:
  306. return false
  307. }
  308. }
  309. // DownloadProgress sends the progress updates for the files that are currently being downloaded.
  310. func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate) {
  311. c.send(&DownloadProgress{
  312. Folder: folder,
  313. Updates: updates,
  314. }, nil)
  315. }
  316. func (c *rawConnection) ping() bool {
  317. return c.send(&Ping{}, nil)
  318. }
  319. func (c *rawConnection) readerLoop() {
  320. fourByteBuf := make([]byte, 4)
  321. for {
  322. msg, err := c.readMessage(fourByteBuf)
  323. if err != nil {
  324. if err == errUnknownMessage {
  325. // Unknown message types are skipped, for future extensibility.
  326. continue
  327. }
  328. c.internalClose(err)
  329. return
  330. }
  331. select {
  332. case c.inbox <- msg:
  333. case <-c.closed:
  334. return
  335. }
  336. }
  337. }
  338. func (c *rawConnection) dispatcherLoop() (err error) {
  339. defer close(c.dispatcherLoopStopped)
  340. var msg message
  341. state := stateInitial
  342. for {
  343. select {
  344. case msg = <-c.inbox:
  345. case <-c.closed:
  346. return ErrClosed
  347. }
  348. switch msg := msg.(type) {
  349. case *ClusterConfig:
  350. l.Debugln("read ClusterConfig message")
  351. if state != stateInitial {
  352. return fmt.Errorf("protocol error: cluster config message in state %d", state)
  353. }
  354. c.receiver.ClusterConfig(c.id, *msg)
  355. state = stateReady
  356. case *Index:
  357. l.Debugln("read Index message")
  358. if state != stateReady {
  359. return fmt.Errorf("protocol error: index message in state %d", state)
  360. }
  361. if err := checkIndexConsistency(msg.Files); err != nil {
  362. return fmt.Errorf("protocol error: index: %v", err)
  363. }
  364. c.handleIndex(*msg)
  365. state = stateReady
  366. case *IndexUpdate:
  367. l.Debugln("read IndexUpdate message")
  368. if state != stateReady {
  369. return fmt.Errorf("protocol error: index update message in state %d", state)
  370. }
  371. if err := checkIndexConsistency(msg.Files); err != nil {
  372. return fmt.Errorf("protocol error: index update: %v", err)
  373. }
  374. c.handleIndexUpdate(*msg)
  375. state = stateReady
  376. case *Request:
  377. l.Debugln("read Request message")
  378. if state != stateReady {
  379. return fmt.Errorf("protocol error: request message in state %d", state)
  380. }
  381. if err := checkFilename(msg.Name); err != nil {
  382. return fmt.Errorf("protocol error: request: %q: %v", msg.Name, err)
  383. }
  384. go c.handleRequest(*msg)
  385. case *Response:
  386. l.Debugln("read Response message")
  387. if state != stateReady {
  388. return fmt.Errorf("protocol error: response message in state %d", state)
  389. }
  390. c.handleResponse(*msg)
  391. case *DownloadProgress:
  392. l.Debugln("read DownloadProgress message")
  393. if state != stateReady {
  394. return fmt.Errorf("protocol error: response message in state %d", state)
  395. }
  396. c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates)
  397. case *Ping:
  398. l.Debugln("read Ping message")
  399. if state != stateReady {
  400. return fmt.Errorf("protocol error: ping message in state %d", state)
  401. }
  402. // Nothing
  403. case *Close:
  404. l.Debugln("read Close message")
  405. return errors.New(msg.Reason)
  406. default:
  407. l.Debugf("read unknown message: %+T", msg)
  408. return fmt.Errorf("protocol error: %s: unknown or empty message", c.id)
  409. }
  410. }
  411. }
  412. func (c *rawConnection) readMessage(fourByteBuf []byte) (message, error) {
  413. hdr, err := c.readHeader(fourByteBuf)
  414. if err != nil {
  415. return nil, err
  416. }
  417. return c.readMessageAfterHeader(hdr, fourByteBuf)
  418. }
  419. func (c *rawConnection) readMessageAfterHeader(hdr Header, fourByteBuf []byte) (message, error) {
  420. // First comes a 4 byte message length
  421. if _, err := io.ReadFull(c.cr, fourByteBuf[:4]); err != nil {
  422. return nil, fmt.Errorf("reading message length: %v", err)
  423. }
  424. msgLen := int32(binary.BigEndian.Uint32(fourByteBuf))
  425. if msgLen < 0 {
  426. return nil, fmt.Errorf("negative message length %d", msgLen)
  427. }
  428. // Then comes the message
  429. buf := BufferPool.Get(int(msgLen))
  430. if _, err := io.ReadFull(c.cr, buf); err != nil {
  431. return nil, fmt.Errorf("reading message: %v", err)
  432. }
  433. // ... which might be compressed
  434. switch hdr.Compression {
  435. case MessageCompressionNone:
  436. // Nothing
  437. case MessageCompressionLZ4:
  438. decomp, err := c.lz4Decompress(buf)
  439. BufferPool.Put(buf)
  440. if err != nil {
  441. return nil, fmt.Errorf("decompressing message: %v", err)
  442. }
  443. buf = decomp
  444. default:
  445. return nil, fmt.Errorf("unknown message compression %d", hdr.Compression)
  446. }
  447. // ... and is then unmarshalled
  448. msg, err := c.newMessage(hdr.Type)
  449. if err != nil {
  450. return nil, err
  451. }
  452. if err := msg.Unmarshal(buf); err != nil {
  453. return nil, fmt.Errorf("unmarshalling message: %v", err)
  454. }
  455. BufferPool.Put(buf)
  456. return msg, nil
  457. }
  458. func (c *rawConnection) readHeader(fourByteBuf []byte) (Header, error) {
  459. // First comes a 2 byte header length
  460. if _, err := io.ReadFull(c.cr, fourByteBuf[:2]); err != nil {
  461. return Header{}, fmt.Errorf("reading length: %v", err)
  462. }
  463. hdrLen := int16(binary.BigEndian.Uint16(fourByteBuf))
  464. if hdrLen < 0 {
  465. return Header{}, fmt.Errorf("negative header length %d", hdrLen)
  466. }
  467. // Then comes the header
  468. buf := BufferPool.Get(int(hdrLen))
  469. if _, err := io.ReadFull(c.cr, buf); err != nil {
  470. return Header{}, fmt.Errorf("reading header: %v", err)
  471. }
  472. var hdr Header
  473. if err := hdr.Unmarshal(buf); err != nil {
  474. return Header{}, fmt.Errorf("unmarshalling header: %v", err)
  475. }
  476. BufferPool.Put(buf)
  477. return hdr, nil
  478. }
  479. func (c *rawConnection) handleIndex(im Index) {
  480. l.Debugf("Index(%v, %v, %d file)", c.id, im.Folder, len(im.Files))
  481. c.receiver.Index(c.id, im.Folder, im.Files)
  482. }
  483. func (c *rawConnection) handleIndexUpdate(im IndexUpdate) {
  484. l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Folder, len(im.Files))
  485. c.receiver.IndexUpdate(c.id, im.Folder, im.Files)
  486. }
  487. // checkIndexConsistency verifies a number of invariants on FileInfos received in
  488. // index messages.
  489. func checkIndexConsistency(fs []FileInfo) error {
  490. for _, f := range fs {
  491. if err := checkFileInfoConsistency(f); err != nil {
  492. return fmt.Errorf("%q: %v", f.Name, err)
  493. }
  494. }
  495. return nil
  496. }
  497. // checkFileInfoConsistency verifies a number of invariants on the given FileInfo
  498. func checkFileInfoConsistency(f FileInfo) error {
  499. if err := checkFilename(f.Name); err != nil {
  500. return err
  501. }
  502. switch {
  503. case f.Deleted && len(f.Blocks) != 0:
  504. // Deleted files should have no blocks
  505. return errDeletedHasBlocks
  506. case f.Type == FileInfoTypeDirectory && len(f.Blocks) != 0:
  507. // Directories should have no blocks
  508. return errDirectoryHasBlocks
  509. case !f.Deleted && !f.IsInvalid() && f.Type == FileInfoTypeFile && len(f.Blocks) == 0:
  510. // Non-deleted, non-invalid files should have at least one block
  511. return errFileHasNoBlocks
  512. }
  513. return nil
  514. }
  515. // checkFilename verifies that the given filename is valid according to the
  516. // spec on what's allowed over the wire. A filename failing this test is
  517. // grounds for disconnecting the device.
  518. func checkFilename(name string) error {
  519. cleanedName := path.Clean(name)
  520. if cleanedName != name {
  521. // The filename on the wire should be in canonical format. If
  522. // Clean() managed to clean it up, there was something wrong with
  523. // it.
  524. return errUncleanFilename
  525. }
  526. switch name {
  527. case "", ".", "..":
  528. // These names are always invalid.
  529. return errInvalidFilename
  530. }
  531. if strings.HasPrefix(name, "/") {
  532. // Names are folder relative, not absolute.
  533. return errInvalidFilename
  534. }
  535. if strings.HasPrefix(name, "../") {
  536. // Starting with a dotdot is not allowed. Any other dotdots would
  537. // have been handled by the Clean() call at the top.
  538. return errInvalidFilename
  539. }
  540. return nil
  541. }
  542. func (c *rawConnection) handleRequest(req Request) {
  543. res, err := c.receiver.Request(c.id, req.Folder, req.Name, req.Size, req.Offset, req.Hash, req.WeakHash, req.FromTemporary)
  544. if err != nil {
  545. c.send(&Response{
  546. ID: req.ID,
  547. Code: errorToCode(err),
  548. }, nil)
  549. return
  550. }
  551. done := make(chan struct{})
  552. c.send(&Response{
  553. ID: req.ID,
  554. Data: res.Data(),
  555. Code: errorToCode(nil),
  556. }, done)
  557. <-done
  558. res.Close()
  559. }
  560. func (c *rawConnection) handleResponse(resp Response) {
  561. c.awaitingMut.Lock()
  562. if rc := c.awaiting[resp.ID]; rc != nil {
  563. delete(c.awaiting, resp.ID)
  564. rc <- asyncResult{resp.Data, codeToError(resp.Code)}
  565. close(rc)
  566. }
  567. c.awaitingMut.Unlock()
  568. }
  569. func (c *rawConnection) send(msg message, done chan struct{}) bool {
  570. select {
  571. case c.outbox <- asyncMessage{msg, done}:
  572. return true
  573. case <-c.preventSends:
  574. case <-c.closed:
  575. }
  576. if done != nil {
  577. close(done)
  578. }
  579. return false
  580. }
  581. func (c *rawConnection) writerLoop() {
  582. select {
  583. case cc := <-c.clusterConfigBox:
  584. err := c.writeMessage(cc)
  585. if err != nil {
  586. c.internalClose(err)
  587. return
  588. }
  589. case hm := <-c.closeBox:
  590. _ = c.writeMessage(hm.msg)
  591. close(hm.done)
  592. return
  593. case <-c.closed:
  594. return
  595. }
  596. for {
  597. select {
  598. case hm := <-c.outbox:
  599. err := c.writeMessage(hm.msg)
  600. if hm.done != nil {
  601. close(hm.done)
  602. }
  603. if err != nil {
  604. c.internalClose(err)
  605. return
  606. }
  607. case hm := <-c.closeBox:
  608. _ = c.writeMessage(hm.msg)
  609. close(hm.done)
  610. return
  611. case <-c.closed:
  612. return
  613. }
  614. }
  615. }
  616. func (c *rawConnection) writeMessage(msg message) error {
  617. if c.shouldCompressMessage(msg) {
  618. return c.writeCompressedMessage(msg)
  619. }
  620. return c.writeUncompressedMessage(msg)
  621. }
  622. func (c *rawConnection) writeCompressedMessage(msg message) error {
  623. size := msg.ProtoSize()
  624. buf := BufferPool.Get(size)
  625. if _, err := msg.MarshalTo(buf); err != nil {
  626. return fmt.Errorf("marshalling message: %v", err)
  627. }
  628. compressed, err := c.lz4Compress(buf)
  629. if err != nil {
  630. return fmt.Errorf("compressing message: %v", err)
  631. }
  632. hdr := Header{
  633. Type: c.typeOf(msg),
  634. Compression: MessageCompressionLZ4,
  635. }
  636. hdrSize := hdr.ProtoSize()
  637. if hdrSize > 1<<16-1 {
  638. panic("impossibly large header")
  639. }
  640. totSize := 2 + hdrSize + 4 + len(compressed)
  641. buf = BufferPool.Upgrade(buf, totSize)
  642. // Header length
  643. binary.BigEndian.PutUint16(buf, uint16(hdrSize))
  644. // Header
  645. if _, err := hdr.MarshalTo(buf[2:]); err != nil {
  646. return fmt.Errorf("marshalling header: %v", err)
  647. }
  648. // Message length
  649. binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(len(compressed)))
  650. // Message
  651. copy(buf[2+hdrSize+4:], compressed)
  652. BufferPool.Put(compressed)
  653. n, err := c.cw.Write(buf)
  654. BufferPool.Put(buf)
  655. l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message (%d uncompressed)), err=%v", n, hdrSize, len(compressed), size, err)
  656. if err != nil {
  657. return fmt.Errorf("writing message: %v", err)
  658. }
  659. return nil
  660. }
  661. func (c *rawConnection) writeUncompressedMessage(msg message) error {
  662. size := msg.ProtoSize()
  663. hdr := Header{
  664. Type: c.typeOf(msg),
  665. }
  666. hdrSize := hdr.ProtoSize()
  667. if hdrSize > 1<<16-1 {
  668. panic("impossibly large header")
  669. }
  670. totSize := 2 + hdrSize + 4 + size
  671. buf := BufferPool.Get(totSize)
  672. // Header length
  673. binary.BigEndian.PutUint16(buf, uint16(hdrSize))
  674. // Header
  675. if _, err := hdr.MarshalTo(buf[2:]); err != nil {
  676. return fmt.Errorf("marshalling header: %v", err)
  677. }
  678. // Message length
  679. binary.BigEndian.PutUint32(buf[2+hdrSize:], uint32(size))
  680. // Message
  681. if _, err := msg.MarshalTo(buf[2+hdrSize+4:]); err != nil {
  682. return fmt.Errorf("marshalling message: %v", err)
  683. }
  684. n, err := c.cw.Write(buf[:totSize])
  685. BufferPool.Put(buf)
  686. l.Debugf("wrote %d bytes on the wire (2 bytes length, %d bytes header, 4 bytes message length, %d bytes message), err=%v", n, hdrSize, size, err)
  687. if err != nil {
  688. return fmt.Errorf("writing message: %v", err)
  689. }
  690. return nil
  691. }
  692. func (c *rawConnection) typeOf(msg message) MessageType {
  693. switch msg.(type) {
  694. case *ClusterConfig:
  695. return messageTypeClusterConfig
  696. case *Index:
  697. return messageTypeIndex
  698. case *IndexUpdate:
  699. return messageTypeIndexUpdate
  700. case *Request:
  701. return messageTypeRequest
  702. case *Response:
  703. return messageTypeResponse
  704. case *DownloadProgress:
  705. return messageTypeDownloadProgress
  706. case *Ping:
  707. return messageTypePing
  708. case *Close:
  709. return messageTypeClose
  710. default:
  711. panic("bug: unknown message type")
  712. }
  713. }
  714. func (c *rawConnection) newMessage(t MessageType) (message, error) {
  715. switch t {
  716. case messageTypeClusterConfig:
  717. return new(ClusterConfig), nil
  718. case messageTypeIndex:
  719. return new(Index), nil
  720. case messageTypeIndexUpdate:
  721. return new(IndexUpdate), nil
  722. case messageTypeRequest:
  723. return new(Request), nil
  724. case messageTypeResponse:
  725. return new(Response), nil
  726. case messageTypeDownloadProgress:
  727. return new(DownloadProgress), nil
  728. case messageTypePing:
  729. return new(Ping), nil
  730. case messageTypeClose:
  731. return new(Close), nil
  732. default:
  733. return nil, errUnknownMessage
  734. }
  735. }
  736. func (c *rawConnection) shouldCompressMessage(msg message) bool {
  737. switch c.compression {
  738. case CompressNever:
  739. return false
  740. case CompressAlways:
  741. // Use compression for large enough messages
  742. return msg.ProtoSize() >= compressionThreshold
  743. case CompressMetadata:
  744. _, isResponse := msg.(*Response)
  745. // Compress if it's large enough and not a response message
  746. return !isResponse && msg.ProtoSize() >= compressionThreshold
  747. default:
  748. panic("unknown compression setting")
  749. }
  750. }
  751. // Close is called when the connection is regularely closed and thus the Close
  752. // BEP message is sent before terminating the actual connection. The error
  753. // argument specifies the reason for closing the connection.
  754. func (c *rawConnection) Close(err error) {
  755. c.sendCloseOnce.Do(func() {
  756. done := make(chan struct{})
  757. timeout := time.NewTimer(CloseTimeout)
  758. select {
  759. case c.closeBox <- asyncMessage{&Close{err.Error()}, done}:
  760. select {
  761. case <-done:
  762. case <-timeout.C:
  763. case <-c.closed:
  764. }
  765. case <-timeout.C:
  766. case <-c.closed:
  767. }
  768. })
  769. // Close might be called from a method that is called from within
  770. // dispatcherLoop, resulting in a deadlock.
  771. // The sending above must happen before spawning the routine, to prevent
  772. // the underlying connection from terminating before sending the close msg.
  773. go c.internalClose(err)
  774. }
  775. // internalClose is called if there is an unexpected error during normal operation.
  776. func (c *rawConnection) internalClose(err error) {
  777. c.closeOnce.Do(func() {
  778. l.Debugln("close due to", err)
  779. close(c.closed)
  780. c.awaitingMut.Lock()
  781. for i, ch := range c.awaiting {
  782. if ch != nil {
  783. close(ch)
  784. delete(c.awaiting, i)
  785. }
  786. }
  787. c.awaitingMut.Unlock()
  788. <-c.dispatcherLoopStopped
  789. c.receiver.Closed(c, err)
  790. })
  791. }
  792. // The pingSender makes sure that we've sent a message within the last
  793. // PingSendInterval. If we already have something sent in the last
  794. // PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
  795. // results in an effecting ping interval of somewhere between
  796. // PingSendInterval/2 and PingSendInterval.
  797. func (c *rawConnection) pingSender() {
  798. ticker := time.NewTicker(PingSendInterval / 2)
  799. defer ticker.Stop()
  800. for {
  801. select {
  802. case <-ticker.C:
  803. d := time.Since(c.cw.Last())
  804. if d < PingSendInterval/2 {
  805. l.Debugln(c.id, "ping skipped after wr", d)
  806. continue
  807. }
  808. l.Debugln(c.id, "ping -> after", d)
  809. c.ping()
  810. case <-c.closed:
  811. return
  812. }
  813. }
  814. }
  815. // The pingReceiver checks that we've received a message (any message will do,
  816. // but we expect pings in the absence of other messages) within the last
  817. // ReceiveTimeout. If not, we close the connection with an ErrTimeout.
  818. func (c *rawConnection) pingReceiver() {
  819. ticker := time.NewTicker(ReceiveTimeout / 2)
  820. defer ticker.Stop()
  821. for {
  822. select {
  823. case <-ticker.C:
  824. d := time.Since(c.cr.Last())
  825. if d > ReceiveTimeout {
  826. l.Debugln(c.id, "ping timeout", d)
  827. c.internalClose(ErrTimeout)
  828. }
  829. l.Debugln(c.id, "last read within", d)
  830. case <-c.closed:
  831. return
  832. }
  833. }
  834. }
  835. type Statistics struct {
  836. At time.Time
  837. InBytesTotal int64
  838. OutBytesTotal int64
  839. }
  840. func (c *rawConnection) Statistics() Statistics {
  841. return Statistics{
  842. At: time.Now(),
  843. InBytesTotal: c.cr.Tot(),
  844. OutBytesTotal: c.cw.Tot(),
  845. }
  846. }
  847. func (c *rawConnection) lz4Compress(src []byte) ([]byte, error) {
  848. var err error
  849. buf := BufferPool.Get(len(src))
  850. buf, err = lz4.Encode(buf, src)
  851. if err != nil {
  852. return nil, err
  853. }
  854. binary.BigEndian.PutUint32(buf, binary.LittleEndian.Uint32(buf))
  855. return buf, nil
  856. }
  857. func (c *rawConnection) lz4Decompress(src []byte) ([]byte, error) {
  858. size := binary.BigEndian.Uint32(src)
  859. binary.LittleEndian.PutUint32(src, size)
  860. var err error
  861. buf := BufferPool.Get(int(size))
  862. buf, err = lz4.Decode(buf, src)
  863. if err != nil {
  864. return nil, err
  865. }
  866. return buf, nil
  867. }