model.go 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. package model
  2. /*
  3. Locking
  4. =======
  5. The model has read and write locks. These must be acquired as appropriate by
  6. public methods. To prevent deadlock situations, private methods should never
  7. acquire locks, but document what locks they require.
  8. */
  9. import (
  10. "crypto/sha1"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "log"
  15. "net"
  16. "os"
  17. "path"
  18. "sync"
  19. "time"
  20. "github.com/calmh/syncthing/buffers"
  21. "github.com/calmh/syncthing/protocol"
  22. )
  23. type Model struct {
  24. sync.RWMutex
  25. dir string
  26. global map[string]File // the latest version of each file as it exists in the cluster
  27. local map[string]File // the files we currently have locally on disk
  28. remote map[string]map[string]File
  29. need map[string]bool // the files we need to update
  30. protoConn map[string]Connection
  31. rawConn map[string]io.Closer
  32. updatedLocal int64 // timestamp of last update to local
  33. updateGlobal int64 // timestamp of last update to remote
  34. lastIdxBcast time.Time
  35. lastIdxBcastRequest time.Time
  36. rwRunning bool
  37. parallellFiles int
  38. paralllelReqs int
  39. delete bool
  40. trace map[string]bool
  41. fileLastChanged map[string]time.Time
  42. fileWasSuppressed map[string]int
  43. }
  44. type Connection interface {
  45. ID() string
  46. Index([]protocol.FileInfo)
  47. Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error)
  48. Statistics() protocol.Statistics
  49. }
  50. const (
  51. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  52. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  53. minFileHoldTimeS = 60 // Never allow file changes more often than this
  54. maxFileHoldTimeS = 600 // Always allow file changes at least this often
  55. )
  56. var (
  57. ErrNoSuchFile = errors.New("no such file")
  58. ErrInvalid = errors.New("file is invalid")
  59. )
  60. // NewModel creates and starts a new model. The model starts in read-only mode,
  61. // where it sends index information to connected peers and responds to requests
  62. // for file data without altering the local repository in any way.
  63. func NewModel(dir string) *Model {
  64. m := &Model{
  65. dir: dir,
  66. global: make(map[string]File),
  67. local: make(map[string]File),
  68. remote: make(map[string]map[string]File),
  69. need: make(map[string]bool),
  70. protoConn: make(map[string]Connection),
  71. rawConn: make(map[string]io.Closer),
  72. lastIdxBcast: time.Now(),
  73. trace: make(map[string]bool),
  74. fileLastChanged: make(map[string]time.Time),
  75. fileWasSuppressed: make(map[string]int),
  76. }
  77. go m.broadcastIndexLoop()
  78. return m
  79. }
  80. // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
  81. func (m *Model) Trace(t string) {
  82. m.Lock()
  83. defer m.Unlock()
  84. m.trace[t] = true
  85. }
  86. // StartRW starts read/write processing on the current model. When in
  87. // read/write mode the model will attempt to keep in sync with the cluster by
  88. // pulling needed files from peer nodes.
  89. func (m *Model) StartRW(del bool, pfiles, preqs int) {
  90. m.Lock()
  91. defer m.Unlock()
  92. if m.rwRunning {
  93. panic("starting started model")
  94. }
  95. m.rwRunning = true
  96. m.delete = del
  97. m.parallellFiles = pfiles
  98. m.paralllelReqs = preqs
  99. go m.cleanTempFiles()
  100. go m.puller()
  101. }
  102. // Generation returns an opaque integer that is guaranteed to increment on
  103. // every change to the local repository or global model.
  104. func (m *Model) Generation() int64 {
  105. m.RLock()
  106. defer m.RUnlock()
  107. return m.updatedLocal + m.updateGlobal
  108. }
  109. type ConnectionInfo struct {
  110. protocol.Statistics
  111. Address string
  112. }
  113. // ConnectionStats returns a map with connection statistics for each connected node.
  114. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  115. type remoteAddrer interface {
  116. RemoteAddr() net.Addr
  117. }
  118. m.RLock()
  119. defer m.RUnlock()
  120. var res = make(map[string]ConnectionInfo)
  121. for node, conn := range m.protoConn {
  122. ci := ConnectionInfo{
  123. Statistics: conn.Statistics(),
  124. }
  125. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  126. ci.Address = nc.RemoteAddr().String()
  127. }
  128. res[node] = ci
  129. }
  130. return res
  131. }
  132. // LocalSize returns the number of files, deleted files and total bytes for all
  133. // files in the global model.
  134. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  135. m.RLock()
  136. defer m.RUnlock()
  137. for _, f := range m.global {
  138. if f.Flags&protocol.FlagDeleted == 0 {
  139. files++
  140. bytes += f.Size()
  141. } else {
  142. deleted++
  143. }
  144. }
  145. return
  146. }
  147. // LocalSize returns the number of files, deleted files and total bytes for all
  148. // files in the local repository.
  149. func (m *Model) LocalSize() (files, deleted, bytes int) {
  150. m.RLock()
  151. defer m.RUnlock()
  152. for _, f := range m.local {
  153. if f.Flags&protocol.FlagDeleted == 0 {
  154. files++
  155. bytes += f.Size()
  156. } else {
  157. deleted++
  158. }
  159. }
  160. return
  161. }
  162. // InSyncSize returns the number and total byte size of the local files that
  163. // are in sync with the global model.
  164. func (m *Model) InSyncSize() (files, bytes int) {
  165. m.RLock()
  166. defer m.RUnlock()
  167. for n, f := range m.local {
  168. if gf, ok := m.global[n]; ok && f.Equals(gf) {
  169. files++
  170. bytes += f.Size()
  171. }
  172. }
  173. return
  174. }
  175. // NeedFiles returns the list of currently needed files and the total size.
  176. func (m *Model) NeedFiles() (files []File, bytes int) {
  177. m.RLock()
  178. defer m.RUnlock()
  179. for n := range m.need {
  180. f := m.global[n]
  181. files = append(files, f)
  182. bytes += f.Size()
  183. }
  184. return
  185. }
  186. // Index is called when a new node is connected and we receive their full index.
  187. // Implements the protocol.Model interface.
  188. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  189. m.Lock()
  190. defer m.Unlock()
  191. if m.trace["net"] {
  192. log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
  193. }
  194. repo := make(map[string]File)
  195. for _, f := range fs {
  196. m.indexUpdate(repo, f)
  197. }
  198. m.remote[nodeID] = repo
  199. m.recomputeGlobal()
  200. m.recomputeNeed()
  201. }
  202. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  203. // Implements the protocol.Model interface.
  204. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  205. m.Lock()
  206. defer m.Unlock()
  207. if m.trace["net"] {
  208. log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  209. }
  210. repo, ok := m.remote[nodeID]
  211. if !ok {
  212. log.Printf("WARNING: Index update from node %s that does not have an index", nodeID)
  213. return
  214. }
  215. for _, f := range fs {
  216. m.indexUpdate(repo, f)
  217. }
  218. m.recomputeGlobal()
  219. m.recomputeNeed()
  220. }
  221. func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
  222. if m.trace["idx"] {
  223. var flagComment string
  224. if f.Flags&protocol.FlagDeleted != 0 {
  225. flagComment = " (deleted)"
  226. }
  227. log.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  228. }
  229. if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
  230. log.Printf("WARNING: IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f)
  231. return
  232. }
  233. repo[f.Name] = fileFromFileInfo(f)
  234. }
  235. // Close removes the peer from the model and closes the underlyign connection if possible.
  236. // Implements the protocol.Model interface.
  237. func (m *Model) Close(node string, err error) {
  238. m.Lock()
  239. defer m.Unlock()
  240. conn, ok := m.rawConn[node]
  241. if ok {
  242. conn.Close()
  243. }
  244. delete(m.remote, node)
  245. delete(m.protoConn, node)
  246. delete(m.rawConn, node)
  247. m.recomputeGlobal()
  248. m.recomputeNeed()
  249. }
  250. // Request returns the specified data segment by reading it from local disk.
  251. // Implements the protocol.Model interface.
  252. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  253. // Verify that the requested file exists in the local and global model.
  254. m.RLock()
  255. lf, localOk := m.local[name]
  256. _, globalOk := m.global[name]
  257. m.RUnlock()
  258. if !localOk || !globalOk {
  259. log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  260. return nil, ErrNoSuchFile
  261. }
  262. if lf.Flags&protocol.FlagInvalid != 0 {
  263. return nil, ErrInvalid
  264. }
  265. if m.trace["net"] && nodeID != "<local>" {
  266. log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  267. }
  268. fn := path.Join(m.dir, name)
  269. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  270. if err != nil {
  271. return nil, err
  272. }
  273. defer fd.Close()
  274. buf := buffers.Get(int(size))
  275. _, err = fd.ReadAt(buf, int64(offset))
  276. if err != nil {
  277. return nil, err
  278. }
  279. return buf, nil
  280. }
  281. // ReplaceLocal replaces the local repository index with the given list of files.
  282. // Change suppression is applied to files changing too often.
  283. func (m *Model) ReplaceLocal(fs []File) {
  284. m.Lock()
  285. defer m.Unlock()
  286. var updated bool
  287. var newLocal = make(map[string]File)
  288. for _, f := range fs {
  289. newLocal[f.Name] = f
  290. if ef := m.local[f.Name]; !ef.Equals(f) {
  291. updated = true
  292. }
  293. }
  294. if m.markDeletedLocals(newLocal) {
  295. updated = true
  296. }
  297. if len(newLocal) != len(m.local) {
  298. updated = true
  299. }
  300. if updated {
  301. m.local = newLocal
  302. m.recomputeGlobal()
  303. m.recomputeNeed()
  304. m.updatedLocal = time.Now().Unix()
  305. m.lastIdxBcastRequest = time.Now()
  306. }
  307. }
  308. // SeedLocal replaces the local repository index with the given list of files,
  309. // in protocol data types. Does not track deletes, should only be used to seed
  310. // the local index from a cache file at startup.
  311. func (m *Model) SeedLocal(fs []protocol.FileInfo) {
  312. m.Lock()
  313. defer m.Unlock()
  314. m.local = make(map[string]File)
  315. for _, f := range fs {
  316. m.local[f.Name] = fileFromFileInfo(f)
  317. }
  318. m.recomputeGlobal()
  319. m.recomputeNeed()
  320. }
  321. // ConnectedTo returns true if we are connected to the named node.
  322. func (m *Model) ConnectedTo(nodeID string) bool {
  323. m.RLock()
  324. defer m.RUnlock()
  325. _, ok := m.protoConn[nodeID]
  326. return ok
  327. }
  328. // ProtocolIndex returns the current local index in protocol data types.
  329. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  330. m.RLock()
  331. defer m.RUnlock()
  332. return m.protocolIndex()
  333. }
  334. // RepoID returns a unique ID representing the current repository location.
  335. func (m *Model) RepoID() string {
  336. return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
  337. }
  338. // AddConnection adds a new peer connection to the model. An initial index will
  339. // be sent to the connected peer, thereafter index updates whenever the local
  340. // repository changes.
  341. func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
  342. nodeID := protoConn.ID()
  343. m.Lock()
  344. m.protoConn[nodeID] = protoConn
  345. m.rawConn[nodeID] = rawConn
  346. m.Unlock()
  347. m.RLock()
  348. idx := m.protocolIndex()
  349. m.RUnlock()
  350. go func() {
  351. protoConn.Index(idx)
  352. }()
  353. }
  354. func (m *Model) shouldSuppressChange(name string) bool {
  355. sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
  356. if sup {
  357. m.fileWasSuppressed[name]++
  358. } else {
  359. m.fileWasSuppressed[name] = 0
  360. m.fileLastChanged[name] = time.Now()
  361. }
  362. return sup
  363. }
  364. func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
  365. sinceLast := time.Since(lastChange)
  366. if sinceLast > maxFileHoldTimeS*time.Second {
  367. return false
  368. }
  369. if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
  370. return true
  371. }
  372. return false
  373. }
  374. // protocolIndex returns the current local index in protocol data types.
  375. // Must be called with the read lock held.
  376. func (m *Model) protocolIndex() []protocol.FileInfo {
  377. var index []protocol.FileInfo
  378. for _, f := range m.local {
  379. mf := fileInfoFromFile(f)
  380. if m.trace["idx"] {
  381. var flagComment string
  382. if mf.Flags&protocol.FlagDeleted != 0 {
  383. flagComment = " (deleted)"
  384. }
  385. log.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  386. }
  387. index = append(index, mf)
  388. }
  389. return index
  390. }
  391. func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  392. m.RLock()
  393. nc, ok := m.protoConn[nodeID]
  394. m.RUnlock()
  395. if !ok {
  396. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  397. }
  398. if m.trace["net"] {
  399. log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  400. }
  401. return nc.Request(name, offset, size, hash)
  402. }
  403. func (m *Model) broadcastIndexLoop() {
  404. for {
  405. m.RLock()
  406. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  407. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  408. m.RUnlock()
  409. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  410. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  411. m.Lock()
  412. var indexWg sync.WaitGroup
  413. indexWg.Add(len(m.protoConn))
  414. idx := m.protocolIndex()
  415. m.lastIdxBcast = time.Now()
  416. for _, node := range m.protoConn {
  417. node := node
  418. if m.trace["net"] {
  419. log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
  420. }
  421. go func() {
  422. node.Index(idx)
  423. indexWg.Done()
  424. }()
  425. }
  426. m.Unlock()
  427. indexWg.Wait()
  428. }
  429. time.Sleep(idxBcastHoldtime)
  430. }
  431. }
  432. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  433. // Must be called with the write lock held.
  434. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  435. // For every file in the existing local table, check if they are also
  436. // present in the new local table. If they are not, check that we already
  437. // had the newest version available according to the global table and if so
  438. // note the file as having been deleted.
  439. var updated bool
  440. for n, f := range m.local {
  441. if _, ok := newLocal[n]; !ok {
  442. if gf := m.global[n]; !gf.NewerThan(f) {
  443. if f.Flags&protocol.FlagDeleted == 0 {
  444. f.Flags = protocol.FlagDeleted
  445. f.Version++
  446. f.Blocks = nil
  447. updated = true
  448. }
  449. newLocal[n] = f
  450. }
  451. }
  452. }
  453. return updated
  454. }
  455. func (m *Model) updateLocal(f File) {
  456. if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
  457. m.local[f.Name] = f
  458. m.recomputeGlobal()
  459. m.recomputeNeed()
  460. m.updatedLocal = time.Now().Unix()
  461. m.lastIdxBcastRequest = time.Now()
  462. }
  463. }
  464. // Must be called with the write lock held.
  465. func (m *Model) recomputeGlobal() {
  466. var newGlobal = make(map[string]File)
  467. for n, f := range m.local {
  468. newGlobal[n] = f
  469. }
  470. var highestMod int64
  471. for _, fs := range m.remote {
  472. for n, nf := range fs {
  473. if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
  474. newGlobal[n] = nf
  475. if nf.Modified > highestMod {
  476. highestMod = nf.Modified
  477. }
  478. }
  479. }
  480. }
  481. // Figure out if anything actually changed
  482. var updated bool
  483. if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
  484. updated = true
  485. } else {
  486. for n, f0 := range newGlobal {
  487. if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
  488. updated = true
  489. break
  490. }
  491. }
  492. }
  493. if updated {
  494. m.updateGlobal = time.Now().Unix()
  495. m.global = newGlobal
  496. }
  497. }
  498. // Must be called with the write lock held.
  499. func (m *Model) recomputeNeed() {
  500. m.need = make(map[string]bool)
  501. for n, gf := range m.global {
  502. lf, ok := m.local[n]
  503. if !ok || gf.NewerThan(lf) {
  504. if gf.Flags&protocol.FlagInvalid != 0 {
  505. // Never attempt to sync invalid files
  506. continue
  507. }
  508. if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
  509. // Don't want to delete files, so forget this need
  510. continue
  511. }
  512. if gf.Flags&protocol.FlagDeleted != 0 && !ok {
  513. // Don't have the file, so don't need to delete it
  514. continue
  515. }
  516. if m.trace["need"] {
  517. log.Println("NEED:", ok, lf, gf)
  518. }
  519. m.need[n] = true
  520. }
  521. }
  522. }
  523. // Must be called with the read lock held.
  524. func (m *Model) whoHas(name string) []string {
  525. var remote []string
  526. gf := m.global[name]
  527. for node, files := range m.remote {
  528. if file, ok := files[name]; ok && file.Equals(gf) {
  529. remote = append(remote, node)
  530. }
  531. }
  532. return remote
  533. }
  534. func fileFromFileInfo(f protocol.FileInfo) File {
  535. var blocks = make([]Block, len(f.Blocks))
  536. var offset uint64
  537. for i, b := range f.Blocks {
  538. blocks[i] = Block{
  539. Offset: offset,
  540. Length: b.Length,
  541. Hash: b.Hash,
  542. }
  543. offset += uint64(b.Length)
  544. }
  545. return File{
  546. Name: f.Name,
  547. Flags: f.Flags,
  548. Modified: int64(f.Modified),
  549. Version: f.Version,
  550. Blocks: blocks,
  551. }
  552. }
  553. func fileInfoFromFile(f File) protocol.FileInfo {
  554. var blocks = make([]protocol.BlockInfo, len(f.Blocks))
  555. for i, b := range f.Blocks {
  556. blocks[i] = protocol.BlockInfo{
  557. Length: b.Length,
  558. Hash: b.Hash,
  559. }
  560. }
  561. return protocol.FileInfo{
  562. Name: f.Name,
  563. Flags: f.Flags,
  564. Modified: int64(f.Modified),
  565. Version: f.Version,
  566. Blocks: blocks,
  567. }
  568. }