model.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518
  1. package main
  2. /*
  3. Locking
  4. =======
  5. The model has read and write locks. These must be acquired as appropriate by
  6. public methods. To prevent deadlock situations, private methods should never
  7. acquire locks, but document what locks they require.
  8. */
  9. import (
  10. "errors"
  11. "fmt"
  12. "os"
  13. "path"
  14. "sync"
  15. "time"
  16. "github.com/calmh/syncthing/buffers"
  17. "github.com/calmh/syncthing/protocol"
  18. )
  19. type Model struct {
  20. sync.RWMutex
  21. dir string
  22. global map[string]File // the latest version of each file as it exists in the cluster
  23. local map[string]File // the files we currently have locally on disk
  24. remote map[string]map[string]File
  25. need map[string]bool // the files we need to update
  26. nodes map[string]*protocol.Connection
  27. updatedLocal int64 // timestamp of last update to local
  28. updateGlobal int64 // timestamp of last update to remote
  29. lastIdxBcast time.Time
  30. lastIdxBcastRequest time.Time
  31. }
  32. var (
  33. errNoSuchNode = errors.New("no such node")
  34. )
  35. const (
  36. FlagDeleted = 1 << 12
  37. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  38. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  39. )
  40. func NewModel(dir string) *Model {
  41. m := &Model{
  42. dir: dir,
  43. global: make(map[string]File),
  44. local: make(map[string]File),
  45. remote: make(map[string]map[string]File),
  46. need: make(map[string]bool),
  47. nodes: make(map[string]*protocol.Connection),
  48. lastIdxBcast: time.Now(),
  49. }
  50. go m.printStatsLoop()
  51. go m.broadcastIndexLoop()
  52. return m
  53. }
  54. func (m *Model) Start() {
  55. go m.puller()
  56. }
  57. func (m *Model) printStatsLoop() {
  58. var lastUpdated int64
  59. for {
  60. time.Sleep(60 * time.Second)
  61. m.RLock()
  62. m.printConnectionStats()
  63. if m.updatedLocal+m.updateGlobal > lastUpdated {
  64. m.printModelStats()
  65. lastUpdated = m.updatedLocal + m.updateGlobal
  66. }
  67. m.RUnlock()
  68. }
  69. }
  70. func (m *Model) printConnectionStats() {
  71. for node, conn := range m.nodes {
  72. stats := conn.Statistics()
  73. if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 {
  74. infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000)
  75. } else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
  76. infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
  77. }
  78. }
  79. }
  80. func (m *Model) printModelStats() {
  81. var tot int
  82. for _, f := range m.global {
  83. tot += f.Size()
  84. }
  85. infof("%6d files, %8sB in cluster", len(m.global), toSI(tot))
  86. if len(m.need) > 0 {
  87. tot = 0
  88. for _, f := range m.local {
  89. tot += f.Size()
  90. }
  91. infof("%6d files, %8sB in local repo", len(m.local), toSI(tot))
  92. tot = 0
  93. for n := range m.need {
  94. tot += m.global[n].Size()
  95. }
  96. infof("%6d files, %8sB to synchronize", len(m.need), toSI(tot))
  97. }
  98. }
  99. func toSI(n int) string {
  100. if n > 1<<30 {
  101. return fmt.Sprintf("%.02f G", float64(n)/(1<<30))
  102. }
  103. if n > 1<<20 {
  104. return fmt.Sprintf("%.02f M", float64(n)/(1<<20))
  105. }
  106. if n > 1<<10 {
  107. return fmt.Sprintf("%.01f K", float64(n)/(1<<10))
  108. }
  109. return fmt.Sprintf("%d ", n)
  110. }
  111. // Index is called when a new node is connected and we receive their full index.
  112. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  113. m.Lock()
  114. defer m.Unlock()
  115. if opts.Debug.TraceNet {
  116. debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
  117. }
  118. m.remote[nodeID] = make(map[string]File)
  119. for _, f := range fs {
  120. if f.Flags&FlagDeleted != 0 && !opts.Delete {
  121. // Files marked as deleted do not even enter the model
  122. continue
  123. }
  124. m.remote[nodeID][f.Name] = fileFromFileInfo(f)
  125. }
  126. m.recomputeGlobal()
  127. m.recomputeNeed()
  128. m.printModelStats()
  129. }
  130. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  131. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  132. m.Lock()
  133. defer m.Unlock()
  134. if opts.Debug.TraceNet {
  135. debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  136. }
  137. repo, ok := m.remote[nodeID]
  138. if !ok {
  139. return
  140. }
  141. for _, f := range fs {
  142. if f.Flags&FlagDeleted != 0 && !opts.Delete {
  143. // Files marked as deleted do not even enter the model
  144. continue
  145. }
  146. repo[f.Name] = fileFromFileInfo(f)
  147. }
  148. m.recomputeGlobal()
  149. m.recomputeNeed()
  150. }
  151. // SeedIndex is called when our previously cached index is loaded from disk at startup.
  152. func (m *Model) SeedIndex(fs []protocol.FileInfo) {
  153. m.Lock()
  154. defer m.Unlock()
  155. m.local = make(map[string]File)
  156. for _, f := range fs {
  157. m.local[f.Name] = fileFromFileInfo(f)
  158. }
  159. m.recomputeGlobal()
  160. m.recomputeNeed()
  161. m.printModelStats()
  162. }
  163. func (m *Model) Close(node string) {
  164. m.Lock()
  165. defer m.Unlock()
  166. if opts.Debug.TraceNet {
  167. debugf("NET CLOSE: %s", node)
  168. }
  169. delete(m.remote, node)
  170. delete(m.nodes, node)
  171. m.recomputeGlobal()
  172. m.recomputeNeed()
  173. }
  174. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  175. if opts.Debug.TraceNet && nodeID != "<local>" {
  176. debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  177. }
  178. fn := path.Join(m.dir, name)
  179. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  180. if err != nil {
  181. return nil, err
  182. }
  183. defer fd.Close()
  184. buf := buffers.Get(int(size))
  185. _, err = fd.ReadAt(buf, int64(offset))
  186. if err != nil {
  187. return nil, err
  188. }
  189. return buf, nil
  190. }
  191. func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  192. m.RLock()
  193. nc, ok := m.nodes[nodeID]
  194. m.RUnlock()
  195. if !ok {
  196. return nil, errNoSuchNode
  197. }
  198. if opts.Debug.TraceNet {
  199. debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  200. }
  201. return nc.Request(name, offset, size, hash)
  202. }
  203. func (m *Model) ReplaceLocal(fs []File) {
  204. m.Lock()
  205. defer m.Unlock()
  206. var updated bool
  207. var newLocal = make(map[string]File)
  208. for _, f := range fs {
  209. newLocal[f.Name] = f
  210. if ef := m.local[f.Name]; ef.Modified != f.Modified {
  211. updated = true
  212. }
  213. }
  214. if m.markDeletedLocals(newLocal) {
  215. updated = true
  216. }
  217. if len(newLocal) != len(m.local) {
  218. updated = true
  219. }
  220. if updated {
  221. m.local = newLocal
  222. m.recomputeGlobal()
  223. m.recomputeNeed()
  224. m.updatedLocal = time.Now().Unix()
  225. m.lastIdxBcastRequest = time.Now()
  226. }
  227. }
  228. func (m *Model) broadcastIndexLoop() {
  229. for {
  230. m.RLock()
  231. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  232. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  233. m.RUnlock()
  234. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  235. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  236. m.Lock()
  237. var indexWg sync.WaitGroup
  238. indexWg.Add(len(m.nodes))
  239. idx := m.protocolIndex()
  240. m.lastIdxBcast = time.Now()
  241. for _, node := range m.nodes {
  242. node := node
  243. if opts.Debug.TraceNet {
  244. debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
  245. }
  246. go func() {
  247. node.Index(idx)
  248. indexWg.Done()
  249. }()
  250. }
  251. m.Unlock()
  252. indexWg.Wait()
  253. }
  254. time.Sleep(idxBcastHoldtime)
  255. }
  256. }
  257. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  258. // Must be called with the write lock held.
  259. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  260. // For every file in the existing local table, check if they are also
  261. // present in the new local table. If they are not, check that we already
  262. // had the newest version available according to the global table and if so
  263. // note the file as having been deleted.
  264. var updated bool
  265. for n, f := range m.local {
  266. if _, ok := newLocal[n]; !ok {
  267. if gf := m.global[n]; gf.Modified <= f.Modified {
  268. if f.Flags&FlagDeleted == 0 {
  269. f.Flags = FlagDeleted
  270. f.Modified = f.Modified + 1
  271. f.Blocks = nil
  272. updated = true
  273. }
  274. newLocal[n] = f
  275. }
  276. }
  277. }
  278. return updated
  279. }
  280. func (m *Model) UpdateLocal(f File) {
  281. m.Lock()
  282. defer m.Unlock()
  283. if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
  284. m.local[f.Name] = f
  285. m.recomputeGlobal()
  286. m.recomputeNeed()
  287. m.updatedLocal = time.Now().Unix()
  288. m.lastIdxBcastRequest = time.Now()
  289. }
  290. }
  291. func (m *Model) Dir() string {
  292. m.RLock()
  293. defer m.RUnlock()
  294. return m.dir
  295. }
  296. func (m *Model) HaveFiles() []File {
  297. m.RLock()
  298. defer m.RUnlock()
  299. var files []File
  300. for _, file := range m.local {
  301. files = append(files, file)
  302. }
  303. return files
  304. }
  305. func (m *Model) LocalFile(name string) (File, bool) {
  306. m.RLock()
  307. defer m.RUnlock()
  308. f, ok := m.local[name]
  309. return f, ok
  310. }
  311. func (m *Model) GlobalFile(name string) (File, bool) {
  312. m.RLock()
  313. defer m.RUnlock()
  314. f, ok := m.global[name]
  315. return f, ok
  316. }
  317. // Must be called with the write lock held.
  318. func (m *Model) recomputeGlobal() {
  319. var newGlobal = make(map[string]File)
  320. for n, f := range m.local {
  321. newGlobal[n] = f
  322. }
  323. for _, fs := range m.remote {
  324. for n, f := range fs {
  325. if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified {
  326. newGlobal[n] = f
  327. }
  328. }
  329. }
  330. // Figure out if anything actually changed
  331. var updated bool
  332. if len(newGlobal) != len(m.global) {
  333. updated = true
  334. } else {
  335. for n, f0 := range newGlobal {
  336. if f1, ok := m.global[n]; !ok || f0.Modified != f1.Modified {
  337. updated = true
  338. break
  339. }
  340. }
  341. }
  342. if updated {
  343. m.updateGlobal = time.Now().Unix()
  344. m.global = newGlobal
  345. }
  346. }
  347. // Must be called with the write lock held.
  348. func (m *Model) recomputeNeed() {
  349. m.need = make(map[string]bool)
  350. for n, f := range m.global {
  351. hf, ok := m.local[n]
  352. if !ok || f.Modified > hf.Modified {
  353. m.need[n] = true
  354. }
  355. }
  356. }
  357. // Must be called with the read lock held.
  358. func (m *Model) whoHas(name string) []string {
  359. var remote []string
  360. gf := m.global[name]
  361. for node, files := range m.remote {
  362. if file, ok := files[name]; ok && file.Modified == gf.Modified {
  363. remote = append(remote, node)
  364. }
  365. }
  366. return remote
  367. }
  368. func (m *Model) ConnectedTo(nodeID string) bool {
  369. m.RLock()
  370. defer m.RUnlock()
  371. _, ok := m.nodes[nodeID]
  372. return ok
  373. }
  374. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  375. m.RLock()
  376. defer m.RUnlock()
  377. return m.protocolIndex()
  378. }
  379. // Must be called with the read lock held.
  380. func (m *Model) protocolIndex() []protocol.FileInfo {
  381. var index []protocol.FileInfo
  382. for _, f := range m.local {
  383. mf := fileInfoFromFile(f)
  384. if opts.Debug.TraceIdx {
  385. var flagComment string
  386. if mf.Flags&FlagDeleted != 0 {
  387. flagComment = " (deleted)"
  388. }
  389. debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
  390. }
  391. index = append(index, mf)
  392. }
  393. return index
  394. }
  395. func (m *Model) AddNode(node *protocol.Connection) {
  396. m.Lock()
  397. m.nodes[node.ID] = node
  398. m.Unlock()
  399. m.RLock()
  400. idx := m.protocolIndex()
  401. m.RUnlock()
  402. if opts.Debug.TraceNet {
  403. debugf("NET IDX(out/add): %s: %d files", node.ID, len(idx))
  404. }
  405. // Sending the index might take a while if we have many files and a slow
  406. // uplink. Return from AddNode in the meantime.
  407. go node.Index(idx)
  408. }
  409. func fileFromFileInfo(f protocol.FileInfo) File {
  410. var blocks []Block
  411. var offset uint64
  412. for _, b := range f.Blocks {
  413. blocks = append(blocks, Block{
  414. Offset: offset,
  415. Length: b.Length,
  416. Hash: b.Hash,
  417. })
  418. offset += uint64(b.Length)
  419. }
  420. return File{
  421. Name: f.Name,
  422. Flags: f.Flags,
  423. Modified: int64(f.Modified),
  424. Blocks: blocks,
  425. }
  426. }
  427. func fileInfoFromFile(f File) protocol.FileInfo {
  428. var blocks []protocol.BlockInfo
  429. for _, b := range f.Blocks {
  430. blocks = append(blocks, protocol.BlockInfo{
  431. Length: b.Length,
  432. Hash: b.Hash,
  433. })
  434. }
  435. return protocol.FileInfo{
  436. Name: f.Name,
  437. Flags: f.Flags,
  438. Modified: int64(f.Modified),
  439. Blocks: blocks,
  440. }
  441. }