model.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. package main
  2. /*
  3. Locking
  4. =======
  5. The model has read and write locks. These must be acquired as appropriate by
  6. public methods. To prevent deadlock situations, private methods should never
  7. acquire locks, but document what locks they require.
  8. */
  9. import (
  10. "fmt"
  11. "io"
  12. "net"
  13. "os"
  14. "path"
  15. "sync"
  16. "time"
  17. "github.com/calmh/syncthing/buffers"
  18. "github.com/calmh/syncthing/protocol"
  19. )
  20. type Model struct {
  21. sync.RWMutex
  22. dir string
  23. global map[string]File // the latest version of each file as it exists in the cluster
  24. local map[string]File // the files we currently have locally on disk
  25. remote map[string]map[string]File
  26. need map[string]bool // the files we need to update
  27. nodes map[string]*protocol.Connection
  28. rawConn map[string]io.ReadWriteCloser
  29. updatedLocal int64 // timestamp of last update to local
  30. updateGlobal int64 // timestamp of last update to remote
  31. lastIdxBcast time.Time
  32. lastIdxBcastRequest time.Time
  33. }
  34. const (
  35. FlagDeleted = 1 << 12
  36. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  37. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  38. )
  39. func NewModel(dir string) *Model {
  40. m := &Model{
  41. dir: dir,
  42. global: make(map[string]File),
  43. local: make(map[string]File),
  44. remote: make(map[string]map[string]File),
  45. need: make(map[string]bool),
  46. nodes: make(map[string]*protocol.Connection),
  47. rawConn: make(map[string]io.ReadWriteCloser),
  48. lastIdxBcast: time.Now(),
  49. }
  50. go m.broadcastIndexLoop()
  51. return m
  52. }
  53. func (m *Model) Start() {
  54. go m.puller()
  55. }
  56. func (m *Model) Generation() int64 {
  57. m.RLock()
  58. defer m.RUnlock()
  59. return m.updatedLocal + m.updateGlobal
  60. }
  61. type ConnectionInfo struct {
  62. protocol.Statistics
  63. Address string
  64. }
  65. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  66. type remoteAddrer interface {
  67. RemoteAddr() net.Addr
  68. }
  69. m.RLock()
  70. defer m.RUnlock()
  71. var res = make(map[string]ConnectionInfo)
  72. for node, conn := range m.nodes {
  73. ci := ConnectionInfo{
  74. Statistics: conn.Statistics(),
  75. }
  76. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  77. ci.Address = nc.RemoteAddr().String()
  78. }
  79. res[node] = ci
  80. }
  81. return res
  82. }
  83. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  84. m.RLock()
  85. defer m.RUnlock()
  86. for _, f := range m.global {
  87. if f.Flags&FlagDeleted == 0 {
  88. files++
  89. bytes += f.Size()
  90. } else {
  91. deleted++
  92. }
  93. }
  94. return
  95. }
  96. func (m *Model) LocalSize() (files, deleted, bytes int) {
  97. m.RLock()
  98. defer m.RUnlock()
  99. for _, f := range m.local {
  100. if f.Flags&FlagDeleted == 0 {
  101. files++
  102. bytes += f.Size()
  103. } else {
  104. deleted++
  105. }
  106. }
  107. return
  108. }
  109. type FileInfo struct {
  110. Name string
  111. Size int
  112. }
  113. func (m *Model) NeedFiles() (files []FileInfo, bytes int) {
  114. m.RLock()
  115. defer m.RUnlock()
  116. for n := range m.need {
  117. f := m.global[n]
  118. s := f.Size()
  119. files = append(files, FileInfo{f.Name, s})
  120. bytes += s
  121. }
  122. return
  123. }
  124. // Index is called when a new node is connected and we receive their full index.
  125. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  126. m.Lock()
  127. defer m.Unlock()
  128. if opts.Debug.TraceNet {
  129. debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
  130. }
  131. m.remote[nodeID] = make(map[string]File)
  132. for _, f := range fs {
  133. if f.Flags&FlagDeleted != 0 && !opts.Delete {
  134. // Files marked as deleted do not even enter the model
  135. continue
  136. }
  137. m.remote[nodeID][f.Name] = fileFromFileInfo(f)
  138. }
  139. m.recomputeGlobal()
  140. m.recomputeNeed()
  141. }
  142. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  143. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  144. m.Lock()
  145. defer m.Unlock()
  146. if opts.Debug.TraceNet {
  147. debugf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  148. }
  149. repo, ok := m.remote[nodeID]
  150. if !ok {
  151. return
  152. }
  153. for _, f := range fs {
  154. if f.Flags&FlagDeleted != 0 && !opts.Delete {
  155. // Files marked as deleted do not even enter the model
  156. continue
  157. }
  158. repo[f.Name] = fileFromFileInfo(f)
  159. }
  160. m.recomputeGlobal()
  161. m.recomputeNeed()
  162. }
  163. // SeedIndex is called when our previously cached index is loaded from disk at startup.
  164. func (m *Model) SeedIndex(fs []protocol.FileInfo) {
  165. m.Lock()
  166. defer m.Unlock()
  167. m.local = make(map[string]File)
  168. for _, f := range fs {
  169. m.local[f.Name] = fileFromFileInfo(f)
  170. }
  171. m.recomputeGlobal()
  172. m.recomputeNeed()
  173. }
  174. func (m *Model) Close(node string, err error) {
  175. m.Lock()
  176. defer m.Unlock()
  177. conn, ok := m.rawConn[node]
  178. if ok {
  179. conn.Close()
  180. } else {
  181. warnln("Close on unknown connection for node", node)
  182. }
  183. if err != nil {
  184. warnf("Disconnected from node %s: %v", node, err)
  185. } else {
  186. infoln("Disconnected from node", node)
  187. }
  188. delete(m.remote, node)
  189. delete(m.nodes, node)
  190. delete(m.rawConn, node)
  191. m.recomputeGlobal()
  192. m.recomputeNeed()
  193. }
  194. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  195. if opts.Debug.TraceNet && nodeID != "<local>" {
  196. debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  197. }
  198. fn := path.Join(m.dir, name)
  199. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  200. if err != nil {
  201. return nil, err
  202. }
  203. defer fd.Close()
  204. buf := buffers.Get(int(size))
  205. _, err = fd.ReadAt(buf, int64(offset))
  206. if err != nil {
  207. return nil, err
  208. }
  209. return buf, nil
  210. }
  211. func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  212. m.RLock()
  213. nc, ok := m.nodes[nodeID]
  214. m.RUnlock()
  215. if !ok {
  216. return nil, fmt.Errorf("RequestGlobal: no such node: %s", nodeID)
  217. }
  218. if opts.Debug.TraceNet {
  219. debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  220. }
  221. return nc.Request(name, offset, size, hash)
  222. }
  223. func (m *Model) ReplaceLocal(fs []File) {
  224. m.Lock()
  225. defer m.Unlock()
  226. var updated bool
  227. var newLocal = make(map[string]File)
  228. for _, f := range fs {
  229. newLocal[f.Name] = f
  230. if ef := m.local[f.Name]; ef.Modified != f.Modified {
  231. updated = true
  232. }
  233. }
  234. if m.markDeletedLocals(newLocal) {
  235. updated = true
  236. }
  237. if len(newLocal) != len(m.local) {
  238. updated = true
  239. }
  240. if updated {
  241. m.local = newLocal
  242. m.recomputeGlobal()
  243. m.recomputeNeed()
  244. m.updatedLocal = time.Now().Unix()
  245. m.lastIdxBcastRequest = time.Now()
  246. }
  247. }
  248. func (m *Model) broadcastIndexLoop() {
  249. for {
  250. m.RLock()
  251. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  252. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  253. m.RUnlock()
  254. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  255. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  256. m.Lock()
  257. var indexWg sync.WaitGroup
  258. indexWg.Add(len(m.nodes))
  259. idx := m.protocolIndex()
  260. m.lastIdxBcast = time.Now()
  261. for _, node := range m.nodes {
  262. node := node
  263. if opts.Debug.TraceNet {
  264. debugf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
  265. }
  266. go func() {
  267. node.Index(idx)
  268. indexWg.Done()
  269. }()
  270. }
  271. m.Unlock()
  272. indexWg.Wait()
  273. }
  274. time.Sleep(idxBcastHoldtime)
  275. }
  276. }
  277. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  278. // Must be called with the write lock held.
  279. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  280. // For every file in the existing local table, check if they are also
  281. // present in the new local table. If they are not, check that we already
  282. // had the newest version available according to the global table and if so
  283. // note the file as having been deleted.
  284. var updated bool
  285. for n, f := range m.local {
  286. if _, ok := newLocal[n]; !ok {
  287. if gf := m.global[n]; gf.Modified <= f.Modified {
  288. if f.Flags&FlagDeleted == 0 {
  289. f.Flags = FlagDeleted
  290. f.Modified = f.Modified + 1
  291. f.Blocks = nil
  292. updated = true
  293. }
  294. newLocal[n] = f
  295. }
  296. }
  297. }
  298. return updated
  299. }
  300. func (m *Model) UpdateLocal(f File) {
  301. m.Lock()
  302. defer m.Unlock()
  303. if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
  304. m.local[f.Name] = f
  305. m.recomputeGlobal()
  306. m.recomputeNeed()
  307. m.updatedLocal = time.Now().Unix()
  308. m.lastIdxBcastRequest = time.Now()
  309. }
  310. }
  311. func (m *Model) Dir() string {
  312. m.RLock()
  313. defer m.RUnlock()
  314. return m.dir
  315. }
  316. func (m *Model) HaveFiles() []File {
  317. m.RLock()
  318. defer m.RUnlock()
  319. var files []File
  320. for _, file := range m.local {
  321. files = append(files, file)
  322. }
  323. return files
  324. }
  325. func (m *Model) LocalFile(name string) (File, bool) {
  326. m.RLock()
  327. defer m.RUnlock()
  328. f, ok := m.local[name]
  329. return f, ok
  330. }
  331. func (m *Model) GlobalFile(name string) (File, bool) {
  332. m.RLock()
  333. defer m.RUnlock()
  334. f, ok := m.global[name]
  335. return f, ok
  336. }
  337. // Must be called with the write lock held.
  338. func (m *Model) recomputeGlobal() {
  339. var newGlobal = make(map[string]File)
  340. for n, f := range m.local {
  341. newGlobal[n] = f
  342. }
  343. for _, fs := range m.remote {
  344. for n, f := range fs {
  345. if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified {
  346. newGlobal[n] = f
  347. }
  348. }
  349. }
  350. // Figure out if anything actually changed
  351. var updated bool
  352. if len(newGlobal) != len(m.global) {
  353. updated = true
  354. } else {
  355. for n, f0 := range newGlobal {
  356. if f1, ok := m.global[n]; !ok || f0.Modified != f1.Modified {
  357. updated = true
  358. break
  359. }
  360. }
  361. }
  362. if updated {
  363. m.updateGlobal = time.Now().Unix()
  364. m.global = newGlobal
  365. }
  366. }
  367. // Must be called with the write lock held.
  368. func (m *Model) recomputeNeed() {
  369. m.need = make(map[string]bool)
  370. for n, f := range m.global {
  371. hf, ok := m.local[n]
  372. if !ok || f.Modified > hf.Modified {
  373. m.need[n] = true
  374. }
  375. }
  376. }
  377. // Must be called with the read lock held.
  378. func (m *Model) whoHas(name string) []string {
  379. var remote []string
  380. gf := m.global[name]
  381. for node, files := range m.remote {
  382. if file, ok := files[name]; ok && file.Modified == gf.Modified {
  383. remote = append(remote, node)
  384. }
  385. }
  386. return remote
  387. }
  388. func (m *Model) ConnectedTo(nodeID string) bool {
  389. m.RLock()
  390. defer m.RUnlock()
  391. _, ok := m.nodes[nodeID]
  392. return ok
  393. }
  394. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  395. m.RLock()
  396. defer m.RUnlock()
  397. return m.protocolIndex()
  398. }
  399. // Must be called with the read lock held.
  400. func (m *Model) protocolIndex() []protocol.FileInfo {
  401. var index []protocol.FileInfo
  402. for _, f := range m.local {
  403. mf := fileInfoFromFile(f)
  404. if opts.Debug.TraceIdx {
  405. var flagComment string
  406. if mf.Flags&FlagDeleted != 0 {
  407. flagComment = " (deleted)"
  408. }
  409. debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
  410. }
  411. index = append(index, mf)
  412. }
  413. return index
  414. }
  415. func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
  416. node := protocol.NewConnection(nodeID, conn, conn, m)
  417. m.Lock()
  418. m.nodes[nodeID] = node
  419. m.rawConn[nodeID] = conn
  420. m.Unlock()
  421. infoln("Connected to node", nodeID)
  422. m.RLock()
  423. idx := m.protocolIndex()
  424. m.RUnlock()
  425. go func() {
  426. node.Index(idx)
  427. infoln("Sent initial index to node", nodeID)
  428. }()
  429. }
  430. func fileFromFileInfo(f protocol.FileInfo) File {
  431. var blocks []Block
  432. var offset uint64
  433. for _, b := range f.Blocks {
  434. blocks = append(blocks, Block{
  435. Offset: offset,
  436. Length: b.Length,
  437. Hash: b.Hash,
  438. })
  439. offset += uint64(b.Length)
  440. }
  441. return File{
  442. Name: f.Name,
  443. Flags: f.Flags,
  444. Modified: int64(f.Modified),
  445. Blocks: blocks,
  446. }
  447. }
  448. func fileInfoFromFile(f File) protocol.FileInfo {
  449. var blocks []protocol.BlockInfo
  450. for _, b := range f.Blocks {
  451. blocks = append(blocks, protocol.BlockInfo{
  452. Length: b.Length,
  453. Hash: b.Hash,
  454. })
  455. }
  456. return protocol.FileInfo{
  457. Name: f.Name,
  458. Flags: f.Flags,
  459. Modified: int64(f.Modified),
  460. Blocks: blocks,
  461. }
  462. }