model.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. package model
  2. /*
  3. Locking
  4. =======
  5. The model has read and write locks. These must be acquired as appropriate by
  6. public methods. To prevent deadlock situations, private methods should never
  7. acquire locks, but document what locks they require.
  8. */
  9. import (
  10. "crypto/sha1"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "log"
  15. "net"
  16. "os"
  17. "path"
  18. "sync"
  19. "time"
  20. "github.com/calmh/syncthing/buffers"
  21. "github.com/calmh/syncthing/protocol"
  22. )
  23. type Model struct {
  24. sync.RWMutex
  25. dir string
  26. global map[string]File // the latest version of each file as it exists in the cluster
  27. local map[string]File // the files we currently have locally on disk
  28. remote map[string]map[string]File
  29. need map[string]bool // the files we need to update
  30. nodes map[string]*protocol.Connection
  31. rawConn map[string]io.ReadWriteCloser
  32. updatedLocal int64 // timestamp of last update to local
  33. updateGlobal int64 // timestamp of last update to remote
  34. lastIdxBcast time.Time
  35. lastIdxBcastRequest time.Time
  36. rwRunning bool
  37. parallellFiles int
  38. paralllelReqs int
  39. delete bool
  40. trace map[string]bool
  41. fileLastChanged map[string]time.Time
  42. fileWasSuppressed map[string]int
  43. }
  44. const (
  45. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  46. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  47. minFileHoldTimeS = 60 // Never allow file changes more often than this
  48. maxFileHoldTimeS = 600 // Always allow file changes at least this often
  49. )
  50. var (
  51. ErrNoSuchFile = errors.New("no such file")
  52. ErrInvalid = errors.New("file is invalid")
  53. )
  54. // NewModel creates and starts a new model. The model starts in read-only mode,
  55. // where it sends index information to connected peers and responds to requests
  56. // for file data without altering the local repository in any way.
  57. func NewModel(dir string) *Model {
  58. m := &Model{
  59. dir: dir,
  60. global: make(map[string]File),
  61. local: make(map[string]File),
  62. remote: make(map[string]map[string]File),
  63. need: make(map[string]bool),
  64. nodes: make(map[string]*protocol.Connection),
  65. rawConn: make(map[string]io.ReadWriteCloser),
  66. lastIdxBcast: time.Now(),
  67. trace: make(map[string]bool),
  68. fileLastChanged: make(map[string]time.Time),
  69. fileWasSuppressed: make(map[string]int),
  70. }
  71. go m.broadcastIndexLoop()
  72. return m
  73. }
  74. // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
  75. func (m *Model) Trace(t string) {
  76. m.Lock()
  77. defer m.Unlock()
  78. m.trace[t] = true
  79. }
  80. // StartRW starts read/write processing on the current model. When in
  81. // read/write mode the model will attempt to keep in sync with the cluster by
  82. // pulling needed files from peer nodes.
  83. func (m *Model) StartRW(del bool, pfiles, preqs int) {
  84. m.Lock()
  85. defer m.Unlock()
  86. if m.rwRunning {
  87. panic("starting started model")
  88. }
  89. m.rwRunning = true
  90. m.delete = del
  91. m.parallellFiles = pfiles
  92. m.paralllelReqs = preqs
  93. go m.cleanTempFiles()
  94. go m.puller()
  95. }
  96. // Generation returns an opaque integer that is guaranteed to increment on
  97. // every change to the local repository or global model.
  98. func (m *Model) Generation() int64 {
  99. m.RLock()
  100. defer m.RUnlock()
  101. return m.updatedLocal + m.updateGlobal
  102. }
  103. type ConnectionInfo struct {
  104. protocol.Statistics
  105. Address string
  106. }
  107. // ConnectionStats returns a map with connection statistics for each connected node.
  108. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  109. type remoteAddrer interface {
  110. RemoteAddr() net.Addr
  111. }
  112. m.RLock()
  113. defer m.RUnlock()
  114. var res = make(map[string]ConnectionInfo)
  115. for node, conn := range m.nodes {
  116. ci := ConnectionInfo{
  117. Statistics: conn.Statistics(),
  118. }
  119. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  120. ci.Address = nc.RemoteAddr().String()
  121. }
  122. res[node] = ci
  123. }
  124. return res
  125. }
  126. // LocalSize returns the number of files, deleted files and total bytes for all
  127. // files in the global model.
  128. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  129. m.RLock()
  130. defer m.RUnlock()
  131. for _, f := range m.global {
  132. if f.Flags&protocol.FlagDeleted == 0 {
  133. files++
  134. bytes += f.Size()
  135. } else {
  136. deleted++
  137. }
  138. }
  139. return
  140. }
  141. // LocalSize returns the number of files, deleted files and total bytes for all
  142. // files in the local repository.
  143. func (m *Model) LocalSize() (files, deleted, bytes int) {
  144. m.RLock()
  145. defer m.RUnlock()
  146. for _, f := range m.local {
  147. if f.Flags&protocol.FlagDeleted == 0 {
  148. files++
  149. bytes += f.Size()
  150. } else {
  151. deleted++
  152. }
  153. }
  154. return
  155. }
  156. // InSyncSize returns the number and total byte size of the local files that
  157. // are in sync with the global model.
  158. func (m *Model) InSyncSize() (files, bytes int) {
  159. m.RLock()
  160. defer m.RUnlock()
  161. for n, f := range m.local {
  162. if gf, ok := m.global[n]; ok && f.Equals(gf) {
  163. files++
  164. bytes += f.Size()
  165. }
  166. }
  167. return
  168. }
  169. // NeedFiles returns the list of currently needed files and the total size.
  170. func (m *Model) NeedFiles() (files []File, bytes int) {
  171. m.RLock()
  172. defer m.RUnlock()
  173. for n := range m.need {
  174. f := m.global[n]
  175. files = append(files, f)
  176. bytes += f.Size()
  177. }
  178. return
  179. }
  180. // Index is called when a new node is connected and we receive their full index.
  181. // Implements the protocol.Model interface.
  182. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  183. m.Lock()
  184. defer m.Unlock()
  185. if m.trace["net"] {
  186. log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
  187. }
  188. m.remote[nodeID] = make(map[string]File)
  189. for _, f := range fs {
  190. m.remote[nodeID][f.Name] = fileFromFileInfo(f)
  191. if m.trace["idx"] {
  192. var flagComment string
  193. if f.Flags&protocol.FlagDeleted != 0 {
  194. flagComment = " (deleted)"
  195. }
  196. log.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  197. }
  198. }
  199. m.recomputeGlobal()
  200. m.recomputeNeed()
  201. }
  202. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  203. // Implements the protocol.Model interface.
  204. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  205. m.Lock()
  206. defer m.Unlock()
  207. if m.trace["net"] {
  208. log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  209. }
  210. repo, ok := m.remote[nodeID]
  211. if !ok {
  212. return
  213. }
  214. for _, f := range fs {
  215. repo[f.Name] = fileFromFileInfo(f)
  216. if m.trace["idx"] {
  217. var flagComment string
  218. if f.Flags&protocol.FlagDeleted != 0 {
  219. flagComment = " (deleted)"
  220. }
  221. log.Printf("IDX(in-up): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  222. }
  223. }
  224. m.recomputeGlobal()
  225. m.recomputeNeed()
  226. }
  227. // Close removes the peer from the model and closes the underlyign connection if possible.
  228. // Implements the protocol.Model interface.
  229. func (m *Model) Close(node string, err error) {
  230. m.Lock()
  231. defer m.Unlock()
  232. conn, ok := m.rawConn[node]
  233. if ok {
  234. conn.Close()
  235. }
  236. delete(m.remote, node)
  237. delete(m.nodes, node)
  238. delete(m.rawConn, node)
  239. m.recomputeGlobal()
  240. m.recomputeNeed()
  241. }
  242. // Request returns the specified data segment by reading it from local disk.
  243. // Implements the protocol.Model interface.
  244. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  245. // Verify that the requested file exists in the local and global model.
  246. m.RLock()
  247. lf, localOk := m.local[name]
  248. _, globalOk := m.global[name]
  249. m.RUnlock()
  250. if !localOk || !globalOk {
  251. log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  252. return nil, ErrNoSuchFile
  253. }
  254. if lf.Flags&protocol.FlagInvalid != 0 {
  255. return nil, ErrInvalid
  256. }
  257. if m.trace["net"] && nodeID != "<local>" {
  258. log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  259. }
  260. fn := path.Join(m.dir, name)
  261. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  262. if err != nil {
  263. return nil, err
  264. }
  265. defer fd.Close()
  266. buf := buffers.Get(int(size))
  267. _, err = fd.ReadAt(buf, int64(offset))
  268. if err != nil {
  269. return nil, err
  270. }
  271. return buf, nil
  272. }
  273. // ReplaceLocal replaces the local repository index with the given list of files.
  274. // Change suppression is applied to files changing too often.
  275. func (m *Model) ReplaceLocal(fs []File) {
  276. m.Lock()
  277. defer m.Unlock()
  278. var updated bool
  279. var newLocal = make(map[string]File)
  280. for _, f := range fs {
  281. newLocal[f.Name] = f
  282. if ef := m.local[f.Name]; !ef.Equals(f) {
  283. updated = true
  284. }
  285. }
  286. if m.markDeletedLocals(newLocal) {
  287. updated = true
  288. }
  289. if len(newLocal) != len(m.local) {
  290. updated = true
  291. }
  292. if updated {
  293. m.local = newLocal
  294. m.recomputeGlobal()
  295. m.recomputeNeed()
  296. m.updatedLocal = time.Now().Unix()
  297. m.lastIdxBcastRequest = time.Now()
  298. }
  299. }
  300. // SeedLocal replaces the local repository index with the given list of files,
  301. // in protocol data types. Does not track deletes, should only be used to seed
  302. // the local index from a cache file at startup.
  303. func (m *Model) SeedLocal(fs []protocol.FileInfo) {
  304. m.Lock()
  305. defer m.Unlock()
  306. m.local = make(map[string]File)
  307. for _, f := range fs {
  308. m.local[f.Name] = fileFromFileInfo(f)
  309. }
  310. m.recomputeGlobal()
  311. m.recomputeNeed()
  312. }
  313. // ConnectedTo returns true if we are connected to the named node.
  314. func (m *Model) ConnectedTo(nodeID string) bool {
  315. m.RLock()
  316. defer m.RUnlock()
  317. _, ok := m.nodes[nodeID]
  318. return ok
  319. }
  320. // ProtocolIndex returns the current local index in protocol data types.
  321. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  322. m.RLock()
  323. defer m.RUnlock()
  324. return m.protocolIndex()
  325. }
  326. // RepoID returns a unique ID representing the current repository location.
  327. func (m *Model) RepoID() string {
  328. return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
  329. }
  330. // AddConnection adds a new peer connection to the model. An initial index will
  331. // be sent to the connected peer, thereafter index updates whenever the local
  332. // repository changes.
  333. func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
  334. node := protocol.NewConnection(nodeID, conn, conn, m)
  335. m.Lock()
  336. m.nodes[nodeID] = node
  337. m.rawConn[nodeID] = conn
  338. m.Unlock()
  339. m.RLock()
  340. idx := m.protocolIndex()
  341. m.RUnlock()
  342. go func() {
  343. node.Index(idx)
  344. }()
  345. }
  346. func (m *Model) shouldSuppressChange(name string) bool {
  347. sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
  348. if sup {
  349. m.fileWasSuppressed[name]++
  350. } else {
  351. m.fileWasSuppressed[name] = 0
  352. m.fileLastChanged[name] = time.Now()
  353. }
  354. return sup
  355. }
  356. func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
  357. sinceLast := time.Since(lastChange)
  358. if sinceLast > maxFileHoldTimeS*time.Second {
  359. return false
  360. }
  361. if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
  362. return true
  363. }
  364. return false
  365. }
  366. // protocolIndex returns the current local index in protocol data types.
  367. // Must be called with the read lock held.
  368. func (m *Model) protocolIndex() []protocol.FileInfo {
  369. var index []protocol.FileInfo
  370. for _, f := range m.local {
  371. mf := fileInfoFromFile(f)
  372. if m.trace["idx"] {
  373. var flagComment string
  374. if mf.Flags&protocol.FlagDeleted != 0 {
  375. flagComment = " (deleted)"
  376. }
  377. log.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  378. }
  379. index = append(index, mf)
  380. }
  381. return index
  382. }
  383. func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  384. m.RLock()
  385. nc, ok := m.nodes[nodeID]
  386. m.RUnlock()
  387. if !ok {
  388. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  389. }
  390. if m.trace["net"] {
  391. log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  392. }
  393. return nc.Request(name, offset, size, hash)
  394. }
  395. func (m *Model) broadcastIndexLoop() {
  396. for {
  397. m.RLock()
  398. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  399. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  400. m.RUnlock()
  401. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  402. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  403. m.Lock()
  404. var indexWg sync.WaitGroup
  405. indexWg.Add(len(m.nodes))
  406. idx := m.protocolIndex()
  407. m.lastIdxBcast = time.Now()
  408. for _, node := range m.nodes {
  409. node := node
  410. if m.trace["net"] {
  411. log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
  412. }
  413. go func() {
  414. node.Index(idx)
  415. indexWg.Done()
  416. }()
  417. }
  418. m.Unlock()
  419. indexWg.Wait()
  420. }
  421. time.Sleep(idxBcastHoldtime)
  422. }
  423. }
  424. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  425. // Must be called with the write lock held.
  426. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  427. // For every file in the existing local table, check if they are also
  428. // present in the new local table. If they are not, check that we already
  429. // had the newest version available according to the global table and if so
  430. // note the file as having been deleted.
  431. var updated bool
  432. for n, f := range m.local {
  433. if _, ok := newLocal[n]; !ok {
  434. if gf := m.global[n]; !gf.NewerThan(f) {
  435. if f.Flags&protocol.FlagDeleted == 0 {
  436. f.Flags = protocol.FlagDeleted
  437. f.Version++
  438. f.Blocks = nil
  439. updated = true
  440. }
  441. newLocal[n] = f
  442. }
  443. }
  444. }
  445. return updated
  446. }
  447. func (m *Model) updateLocal(f File) {
  448. if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
  449. m.local[f.Name] = f
  450. m.recomputeGlobal()
  451. m.recomputeNeed()
  452. m.updatedLocal = time.Now().Unix()
  453. m.lastIdxBcastRequest = time.Now()
  454. }
  455. }
  456. // Must be called with the write lock held.
  457. func (m *Model) recomputeGlobal() {
  458. var newGlobal = make(map[string]File)
  459. for n, f := range m.local {
  460. newGlobal[n] = f
  461. }
  462. for _, fs := range m.remote {
  463. for n, nf := range fs {
  464. if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
  465. newGlobal[n] = nf
  466. }
  467. }
  468. }
  469. // Figure out if anything actually changed
  470. var updated bool
  471. if len(newGlobal) != len(m.global) {
  472. updated = true
  473. } else {
  474. for n, f0 := range newGlobal {
  475. if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
  476. updated = true
  477. break
  478. }
  479. }
  480. }
  481. if updated {
  482. m.updateGlobal = time.Now().Unix()
  483. m.global = newGlobal
  484. }
  485. }
  486. // Must be called with the write lock held.
  487. func (m *Model) recomputeNeed() {
  488. m.need = make(map[string]bool)
  489. for n, gf := range m.global {
  490. lf, ok := m.local[n]
  491. if !ok || gf.NewerThan(lf) {
  492. if gf.Flags&protocol.FlagInvalid != 0 {
  493. // Never attempt to sync invalid files
  494. continue
  495. }
  496. if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
  497. // Don't want to delete files, so forget this need
  498. continue
  499. }
  500. if gf.Flags&protocol.FlagDeleted != 0 && !ok {
  501. // Don't have the file, so don't need to delete it
  502. continue
  503. }
  504. if m.trace["need"] {
  505. log.Println("NEED:", ok, lf, gf)
  506. }
  507. m.need[n] = true
  508. }
  509. }
  510. }
  511. // Must be called with the read lock held.
  512. func (m *Model) whoHas(name string) []string {
  513. var remote []string
  514. gf := m.global[name]
  515. for node, files := range m.remote {
  516. if file, ok := files[name]; ok && file.Equals(gf) {
  517. remote = append(remote, node)
  518. }
  519. }
  520. return remote
  521. }
  522. func fileFromFileInfo(f protocol.FileInfo) File {
  523. var blocks []Block
  524. var offset uint64
  525. for _, b := range f.Blocks {
  526. blocks = append(blocks, Block{
  527. Offset: offset,
  528. Length: b.Length,
  529. Hash: b.Hash,
  530. })
  531. offset += uint64(b.Length)
  532. }
  533. return File{
  534. Name: f.Name,
  535. Flags: f.Flags,
  536. Modified: int64(f.Modified),
  537. Version: f.Version,
  538. Blocks: blocks,
  539. }
  540. }
  541. func fileInfoFromFile(f File) protocol.FileInfo {
  542. var blocks []protocol.BlockInfo
  543. for _, b := range f.Blocks {
  544. blocks = append(blocks, protocol.BlockInfo{
  545. Length: b.Length,
  546. Hash: b.Hash,
  547. })
  548. }
  549. return protocol.FileInfo{
  550. Name: f.Name,
  551. Flags: f.Flags,
  552. Modified: int64(f.Modified),
  553. Version: f.Version,
  554. Blocks: blocks,
  555. }
  556. }