model.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568
  1. package main
  2. /*
  3. Locking
  4. =======
  5. The model has read and write locks. These must be acquired as appropriate by
  6. public methods. To prevent deadlock situations, private methods should never
  7. acquire locks, but document what locks they require.
  8. */
  9. import (
  10. "fmt"
  11. "io"
  12. "net"
  13. "os"
  14. "path"
  15. "sync"
  16. "time"
  17. "github.com/calmh/syncthing/buffers"
  18. "github.com/calmh/syncthing/protocol"
  19. )
  20. type Model struct {
  21. sync.RWMutex
  22. dir string
  23. global map[string]File // the latest version of each file as it exists in the cluster
  24. local map[string]File // the files we currently have locally on disk
  25. remote map[string]map[string]File
  26. need map[string]bool // the files we need to update
  27. nodes map[string]*protocol.Connection
  28. rawConn map[string]io.ReadWriteCloser
  29. updatedLocal int64 // timestamp of last update to local
  30. updateGlobal int64 // timestamp of last update to remote
  31. lastIdxBcast time.Time
  32. lastIdxBcastRequest time.Time
  33. }
  34. const (
  35. FlagDeleted = 1 << 12
  36. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  37. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  38. )
  39. func NewModel(dir string) *Model {
  40. m := &Model{
  41. dir: dir,
  42. global: make(map[string]File),
  43. local: make(map[string]File),
  44. remote: make(map[string]map[string]File),
  45. need: make(map[string]bool),
  46. nodes: make(map[string]*protocol.Connection),
  47. rawConn: make(map[string]io.ReadWriteCloser),
  48. lastIdxBcast: time.Now(),
  49. }
  50. go m.broadcastIndexLoop()
  51. return m
  52. }
  53. func (m *Model) Start() {
  54. go m.puller()
  55. }
  56. func (m *Model) Generation() int64 {
  57. m.RLock()
  58. defer m.RUnlock()
  59. return m.updatedLocal + m.updateGlobal
  60. }
  61. type ConnectionInfo struct {
  62. protocol.Statistics
  63. Address string
  64. }
  65. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  66. type remoteAddrer interface {
  67. RemoteAddr() net.Addr
  68. }
  69. m.RLock()
  70. defer m.RUnlock()
  71. var res = make(map[string]ConnectionInfo)
  72. for node, conn := range m.nodes {
  73. ci := ConnectionInfo{
  74. Statistics: conn.Statistics(),
  75. }
  76. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  77. ci.Address = nc.RemoteAddr().String()
  78. }
  79. res[node] = ci
  80. }
  81. return res
  82. }
  83. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  84. m.RLock()
  85. defer m.RUnlock()
  86. for _, f := range m.global {
  87. if f.Flags&FlagDeleted == 0 {
  88. files++
  89. bytes += f.Size()
  90. } else {
  91. deleted++
  92. }
  93. }
  94. return
  95. }
  96. func (m *Model) LocalSize() (files, deleted, bytes int) {
  97. m.RLock()
  98. defer m.RUnlock()
  99. for _, f := range m.local {
  100. if f.Flags&FlagDeleted == 0 {
  101. files++
  102. bytes += f.Size()
  103. } else {
  104. deleted++
  105. }
  106. }
  107. return
  108. }
  109. func (m *Model) InSyncSize() (files, bytes int) {
  110. m.RLock()
  111. defer m.RUnlock()
  112. for n, f := range m.local {
  113. if gf, ok := m.global[n]; ok && f.Modified == gf.Modified {
  114. files++
  115. bytes += f.Size()
  116. }
  117. }
  118. return
  119. }
  120. type FileInfo struct {
  121. Name string
  122. Size int
  123. }
  124. func (m *Model) NeedFiles() (files []FileInfo, bytes int) {
  125. m.RLock()
  126. defer m.RUnlock()
  127. for n := range m.need {
  128. f := m.global[n]
  129. s := f.Size()
  130. files = append(files, FileInfo{f.Name, s})
  131. bytes += s
  132. }
  133. return
  134. }
  135. // Index is called when a new node is connected and we receive their full index.
  136. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  137. m.Lock()
  138. defer m.Unlock()
  139. if opts.Debug.TraceNet {
  140. debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
  141. }
  142. m.remote[nodeID] = make(map[string]File)
  143. for _, f := range fs {
  144. m.remote[nodeID][f.Name] = fileFromFileInfo(f)
  145. }
  146. m.recomputeGlobal()
  147. m.recomputeNeed()
  148. }
  149. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  150. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  151. m.Lock()
  152. defer m.Unlock()
  153. if opts.Debug.TraceNet {
  154. debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  155. }
  156. repo, ok := m.remote[nodeID]
  157. if !ok {
  158. return
  159. }
  160. for _, f := range fs {
  161. if f.Flags&FlagDeleted != 0 && !opts.Delete {
  162. // Files marked as deleted do not even enter the model
  163. continue
  164. }
  165. repo[f.Name] = fileFromFileInfo(f)
  166. }
  167. m.recomputeGlobal()
  168. m.recomputeNeed()
  169. }
  170. // SeedIndex is called when our previously cached index is loaded from disk at startup.
  171. func (m *Model) SeedIndex(fs []protocol.FileInfo) {
  172. m.Lock()
  173. defer m.Unlock()
  174. m.local = make(map[string]File)
  175. for _, f := range fs {
  176. m.local[f.Name] = fileFromFileInfo(f)
  177. }
  178. m.recomputeGlobal()
  179. m.recomputeNeed()
  180. }
  181. func (m *Model) Close(node string, err error) {
  182. m.Lock()
  183. defer m.Unlock()
  184. conn, ok := m.rawConn[node]
  185. if ok {
  186. conn.Close()
  187. } else {
  188. warnln("Close on unknown connection for node", node)
  189. }
  190. if err != nil {
  191. warnf("Disconnected from node %s: %v", node, err)
  192. } else {
  193. infoln("Disconnected from node", node)
  194. }
  195. delete(m.remote, node)
  196. delete(m.nodes, node)
  197. delete(m.rawConn, node)
  198. m.recomputeGlobal()
  199. m.recomputeNeed()
  200. }
  201. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  202. if opts.Debug.TraceNet && nodeID != "<local>" {
  203. debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  204. }
  205. fn := path.Join(m.dir, name)
  206. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  207. if err != nil {
  208. return nil, err
  209. }
  210. defer fd.Close()
  211. buf := buffers.Get(int(size))
  212. _, err = fd.ReadAt(buf, int64(offset))
  213. if err != nil {
  214. return nil, err
  215. }
  216. return buf, nil
  217. }
  218. func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  219. m.RLock()
  220. nc, ok := m.nodes[nodeID]
  221. m.RUnlock()
  222. if !ok {
  223. return nil, fmt.Errorf("RequestGlobal: no such node: %s", nodeID)
  224. }
  225. if opts.Debug.TraceNet {
  226. debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  227. }
  228. return nc.Request(name, offset, size, hash)
  229. }
  230. func (m *Model) ReplaceLocal(fs []File) {
  231. m.Lock()
  232. defer m.Unlock()
  233. var updated bool
  234. var newLocal = make(map[string]File)
  235. for _, f := range fs {
  236. newLocal[f.Name] = f
  237. if ef := m.local[f.Name]; ef.Modified != f.Modified {
  238. updated = true
  239. }
  240. }
  241. if m.markDeletedLocals(newLocal) {
  242. updated = true
  243. }
  244. if len(newLocal) != len(m.local) {
  245. updated = true
  246. }
  247. if updated {
  248. m.local = newLocal
  249. m.recomputeGlobal()
  250. m.recomputeNeed()
  251. m.updatedLocal = time.Now().Unix()
  252. m.lastIdxBcastRequest = time.Now()
  253. }
  254. }
  255. func (m *Model) broadcastIndexLoop() {
  256. for {
  257. m.RLock()
  258. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  259. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  260. m.RUnlock()
  261. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  262. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  263. m.Lock()
  264. var indexWg sync.WaitGroup
  265. indexWg.Add(len(m.nodes))
  266. idx := m.protocolIndex()
  267. m.lastIdxBcast = time.Now()
  268. for _, node := range m.nodes {
  269. node := node
  270. if opts.Debug.TraceNet {
  271. debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
  272. }
  273. go func() {
  274. node.Index(idx)
  275. indexWg.Done()
  276. }()
  277. }
  278. m.Unlock()
  279. indexWg.Wait()
  280. }
  281. time.Sleep(idxBcastHoldtime)
  282. }
  283. }
  284. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  285. // Must be called with the write lock held.
  286. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  287. // For every file in the existing local table, check if they are also
  288. // present in the new local table. If they are not, check that we already
  289. // had the newest version available according to the global table and if so
  290. // note the file as having been deleted.
  291. var updated bool
  292. for n, f := range m.local {
  293. if _, ok := newLocal[n]; !ok {
  294. if gf := m.global[n]; gf.Modified <= f.Modified {
  295. if f.Flags&FlagDeleted == 0 {
  296. f.Flags = FlagDeleted
  297. f.Modified = f.Modified + 1
  298. f.Blocks = nil
  299. updated = true
  300. }
  301. newLocal[n] = f
  302. }
  303. }
  304. }
  305. return updated
  306. }
  307. func (m *Model) UpdateLocal(f File) {
  308. m.Lock()
  309. defer m.Unlock()
  310. if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
  311. m.local[f.Name] = f
  312. m.recomputeGlobal()
  313. m.recomputeNeed()
  314. m.updatedLocal = time.Now().Unix()
  315. m.lastIdxBcastRequest = time.Now()
  316. }
  317. }
  318. func (m *Model) Dir() string {
  319. m.RLock()
  320. defer m.RUnlock()
  321. return m.dir
  322. }
  323. func (m *Model) HaveFiles() []File {
  324. m.RLock()
  325. defer m.RUnlock()
  326. var files []File
  327. for _, file := range m.local {
  328. files = append(files, file)
  329. }
  330. return files
  331. }
  332. func (m *Model) LocalFile(name string) (File, bool) {
  333. m.RLock()
  334. defer m.RUnlock()
  335. f, ok := m.local[name]
  336. return f, ok
  337. }
  338. func (m *Model) GlobalFile(name string) (File, bool) {
  339. m.RLock()
  340. defer m.RUnlock()
  341. f, ok := m.global[name]
  342. return f, ok
  343. }
  344. // Must be called with the write lock held.
  345. func (m *Model) recomputeGlobal() {
  346. var newGlobal = make(map[string]File)
  347. for n, f := range m.local {
  348. newGlobal[n] = f
  349. }
  350. for _, fs := range m.remote {
  351. for n, f := range fs {
  352. if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified {
  353. newGlobal[n] = f
  354. }
  355. }
  356. }
  357. // Figure out if anything actually changed
  358. var updated bool
  359. if len(newGlobal) != len(m.global) {
  360. updated = true
  361. } else {
  362. for n, f0 := range newGlobal {
  363. if f1, ok := m.global[n]; !ok || f0.Modified != f1.Modified {
  364. updated = true
  365. break
  366. }
  367. }
  368. }
  369. if updated {
  370. m.updateGlobal = time.Now().Unix()
  371. m.global = newGlobal
  372. }
  373. }
  374. // Must be called with the write lock held.
  375. func (m *Model) recomputeNeed() {
  376. m.need = make(map[string]bool)
  377. for n, f := range m.global {
  378. hf, ok := m.local[n]
  379. if !ok || f.Modified > hf.Modified {
  380. if f.Flags&FlagDeleted != 0 && !opts.Delete {
  381. // Don't want to delete files, so forget this need
  382. continue
  383. }
  384. if f.Flags&FlagDeleted != 0 && !ok {
  385. // Don't have the file, so don't need to delete it
  386. continue
  387. }
  388. if opts.Debug.TraceNeed {
  389. debugln("NEED:", ok, hf, f)
  390. }
  391. m.need[n] = true
  392. }
  393. }
  394. }
  395. // Must be called with the read lock held.
  396. func (m *Model) whoHas(name string) []string {
  397. var remote []string
  398. gf := m.global[name]
  399. for node, files := range m.remote {
  400. if file, ok := files[name]; ok && file.Modified == gf.Modified {
  401. remote = append(remote, node)
  402. }
  403. }
  404. return remote
  405. }
  406. func (m *Model) ConnectedTo(nodeID string) bool {
  407. m.RLock()
  408. defer m.RUnlock()
  409. _, ok := m.nodes[nodeID]
  410. return ok
  411. }
  412. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  413. m.RLock()
  414. defer m.RUnlock()
  415. return m.protocolIndex()
  416. }
  417. // Must be called with the read lock held.
  418. func (m *Model) protocolIndex() []protocol.FileInfo {
  419. var index []protocol.FileInfo
  420. for _, f := range m.local {
  421. mf := fileInfoFromFile(f)
  422. if opts.Debug.TraceIdx {
  423. var flagComment string
  424. if mf.Flags&FlagDeleted != 0 {
  425. flagComment = " (deleted)"
  426. }
  427. debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
  428. }
  429. index = append(index, mf)
  430. }
  431. return index
  432. }
  433. func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
  434. node := protocol.NewConnection(nodeID, conn, conn, m)
  435. m.Lock()
  436. m.nodes[nodeID] = node
  437. m.rawConn[nodeID] = conn
  438. m.Unlock()
  439. infoln("Connected to node", nodeID)
  440. m.RLock()
  441. idx := m.protocolIndex()
  442. m.RUnlock()
  443. go func() {
  444. node.Index(idx)
  445. infoln("Sent initial index to node", nodeID)
  446. }()
  447. }
  448. func fileFromFileInfo(f protocol.FileInfo) File {
  449. var blocks []Block
  450. var offset uint64
  451. for _, b := range f.Blocks {
  452. blocks = append(blocks, Block{
  453. Offset: offset,
  454. Length: b.Length,
  455. Hash: b.Hash,
  456. })
  457. offset += uint64(b.Length)
  458. }
  459. return File{
  460. Name: f.Name,
  461. Flags: f.Flags,
  462. Modified: int64(f.Modified),
  463. Blocks: blocks,
  464. }
  465. }
  466. func fileInfoFromFile(f File) protocol.FileInfo {
  467. var blocks []protocol.BlockInfo
  468. for _, b := range f.Blocks {
  469. blocks = append(blocks, protocol.BlockInfo{
  470. Length: b.Length,
  471. Hash: b.Hash,
  472. })
  473. }
  474. return protocol.FileInfo{
  475. Name: f.Name,
  476. Flags: f.Flags,
  477. Modified: int64(f.Modified),
  478. Blocks: blocks,
  479. }
  480. }