model.go 13 KB


  1. package model
  2. /*
  3. Locking
  4. =======
  5. The model has read and write locks. These must be acquired as appropriate by
  6. public methods. To prevent deadlock situations, private methods should never
  7. acquire locks, but document what locks they require.
  8. */
  9. import (
  10. "crypto/sha1"
  11. "fmt"
  12. "io"
  13. "log"
  14. "net"
  15. "os"
  16. "path"
  17. "sync"
  18. "time"
  19. "github.com/calmh/syncthing/buffers"
  20. "github.com/calmh/syncthing/protocol"
  21. )
  22. type Model struct {
  23. sync.RWMutex
  24. dir string
  25. global map[string]File // the latest version of each file as it exists in the cluster
  26. local map[string]File // the files we currently have locally on disk
  27. remote map[string]map[string]File
  28. need map[string]bool // the files we need to update
  29. nodes map[string]*protocol.Connection
  30. rawConn map[string]io.ReadWriteCloser
  31. updatedLocal int64 // timestamp of last update to local
  32. updateGlobal int64 // timestamp of last update to remote
  33. lastIdxBcast time.Time
  34. lastIdxBcastRequest time.Time
  35. rwRunning bool
  36. parallellFiles int
  37. paralllelReqs int
  38. delete bool
  39. trace map[string]bool
  40. }
  41. const (
  42. FlagDeleted = 1 << 12
  43. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  44. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  45. )
  46. // NewModel creates and starts a new model. The model starts in read-only mode,
  47. // where it sends index information to connected peers and responds to requests
  48. // for file data without altering the local repository in any way.
  49. func NewModel(dir string) *Model {
  50. m := &Model{
  51. dir: dir,
  52. global: make(map[string]File),
  53. local: make(map[string]File),
  54. remote: make(map[string]map[string]File),
  55. need: make(map[string]bool),
  56. nodes: make(map[string]*protocol.Connection),
  57. rawConn: make(map[string]io.ReadWriteCloser),
  58. lastIdxBcast: time.Now(),
  59. trace: make(map[string]bool),
  60. }
  61. go m.broadcastIndexLoop()
  62. return m
  63. }
  64. // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
  65. func (m *Model) Trace(t string) {
  66. m.Lock()
  67. defer m.Unlock()
  68. m.trace[t] = true
  69. }
  70. // StartRW starts read/write processing on the current model. When in
  71. // read/write mode the model will attempt to keep in sync with the cluster by
  72. // pulling needed files from peer nodes.
  73. func (m *Model) StartRW(del bool, pfiles, preqs int) {
  74. m.Lock()
  75. defer m.Unlock()
  76. if m.rwRunning {
  77. panic("starting started model")
  78. }
  79. m.rwRunning = true
  80. m.delete = del
  81. m.parallellFiles = pfiles
  82. m.paralllelReqs = preqs
  83. go m.cleanTempFiles()
  84. go m.puller()
  85. }
  86. // Generation returns an opaque integer that is guaranteed to increment on
  87. // every change to the local repository or global model.
  88. func (m *Model) Generation() int64 {
  89. m.RLock()
  90. defer m.RUnlock()
  91. return m.updatedLocal + m.updateGlobal
  92. }
  93. type ConnectionInfo struct {
  94. protocol.Statistics
  95. Address string
  96. }
  97. // ConnectionStats returns a map with connection statistics for each connected node.
  98. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  99. type remoteAddrer interface {
  100. RemoteAddr() net.Addr
  101. }
  102. m.RLock()
  103. defer m.RUnlock()
  104. var res = make(map[string]ConnectionInfo)
  105. for node, conn := range m.nodes {
  106. ci := ConnectionInfo{
  107. Statistics: conn.Statistics(),
  108. }
  109. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  110. ci.Address = nc.RemoteAddr().String()
  111. }
  112. res[node] = ci
  113. }
  114. return res
  115. }
  116. // LocalSize returns the number of files, deleted files and total bytes for all
  117. // files in the global model.
  118. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  119. m.RLock()
  120. defer m.RUnlock()
  121. for _, f := range m.global {
  122. if f.Flags&FlagDeleted == 0 {
  123. files++
  124. bytes += f.Size()
  125. } else {
  126. deleted++
  127. }
  128. }
  129. return
  130. }
  131. // LocalSize returns the number of files, deleted files and total bytes for all
  132. // files in the local repository.
  133. func (m *Model) LocalSize() (files, deleted, bytes int) {
  134. m.RLock()
  135. defer m.RUnlock()
  136. for _, f := range m.local {
  137. if f.Flags&FlagDeleted == 0 {
  138. files++
  139. bytes += f.Size()
  140. } else {
  141. deleted++
  142. }
  143. }
  144. return
  145. }
  146. // InSyncSize returns the number and total byte size of the local files that
  147. // are in sync with the global model.
  148. func (m *Model) InSyncSize() (files, bytes int) {
  149. m.RLock()
  150. defer m.RUnlock()
  151. for n, f := range m.local {
  152. if gf, ok := m.global[n]; ok && f.Modified == gf.Modified {
  153. files++
  154. bytes += f.Size()
  155. }
  156. }
  157. return
  158. }
  159. // NeedFiles returns the list of currently needed files and the total size.
  160. func (m *Model) NeedFiles() (files []File, bytes int) {
  161. m.RLock()
  162. defer m.RUnlock()
  163. for n := range m.need {
  164. f := m.global[n]
  165. files = append(files, f)
  166. bytes += f.Size()
  167. }
  168. return
  169. }
  170. // Index is called when a new node is connected and we receive their full index.
  171. // Implements the protocol.Model interface.
  172. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  173. m.Lock()
  174. defer m.Unlock()
  175. if m.trace["net"] {
  176. log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
  177. }
  178. m.remote[nodeID] = make(map[string]File)
  179. for _, f := range fs {
  180. m.remote[nodeID][f.Name] = fileFromFileInfo(f)
  181. }
  182. m.recomputeGlobal()
  183. m.recomputeNeed()
  184. }
  185. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  186. // Implements the protocol.Model interface.
  187. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  188. m.Lock()
  189. defer m.Unlock()
  190. if m.trace["net"] {
  191. log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  192. }
  193. repo, ok := m.remote[nodeID]
  194. if !ok {
  195. return
  196. }
  197. for _, f := range fs {
  198. if f.Flags&FlagDeleted != 0 && !m.delete {
  199. // Files marked as deleted do not even enter the model
  200. continue
  201. }
  202. repo[f.Name] = fileFromFileInfo(f)
  203. }
  204. m.recomputeGlobal()
  205. m.recomputeNeed()
  206. }
  207. // Close removes the peer from the model and closes the underlyign connection if possible.
  208. // Implements the protocol.Model interface.
  209. func (m *Model) Close(node string, err error) {
  210. m.Lock()
  211. defer m.Unlock()
  212. conn, ok := m.rawConn[node]
  213. if ok {
  214. conn.Close()
  215. }
  216. delete(m.remote, node)
  217. delete(m.nodes, node)
  218. delete(m.rawConn, node)
  219. m.recomputeGlobal()
  220. m.recomputeNeed()
  221. }
  222. // Request returns the specified data segment by reading it from local disk.
  223. // Implements the protocol.Model interface.
  224. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  225. if m.trace["net"] && nodeID != "<local>" {
  226. log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  227. }
  228. fn := path.Join(m.dir, name)
  229. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  230. if err != nil {
  231. return nil, err
  232. }
  233. defer fd.Close()
  234. buf := buffers.Get(int(size))
  235. _, err = fd.ReadAt(buf, int64(offset))
  236. if err != nil {
  237. return nil, err
  238. }
  239. return buf, nil
  240. }
  241. // ReplaceLocal replaces the local repository index with the given list of files.
  242. func (m *Model) ReplaceLocal(fs []File) {
  243. m.Lock()
  244. defer m.Unlock()
  245. var updated bool
  246. var newLocal = make(map[string]File)
  247. for _, f := range fs {
  248. newLocal[f.Name] = f
  249. if ef := m.local[f.Name]; ef.Modified != f.Modified {
  250. updated = true
  251. }
  252. }
  253. if m.markDeletedLocals(newLocal) {
  254. updated = true
  255. }
  256. if len(newLocal) != len(m.local) {
  257. updated = true
  258. }
  259. if updated {
  260. m.local = newLocal
  261. m.recomputeGlobal()
  262. m.recomputeNeed()
  263. m.updatedLocal = time.Now().Unix()
  264. m.lastIdxBcastRequest = time.Now()
  265. }
  266. }
  267. // SeedLocal replaces the local repository index with the given list of files,
  268. // in protocol data types. Does not track deletes, should only be used to seed
  269. // the local index from a cache file at startup.
  270. func (m *Model) SeedLocal(fs []protocol.FileInfo) {
  271. m.Lock()
  272. defer m.Unlock()
  273. m.local = make(map[string]File)
  274. for _, f := range fs {
  275. m.local[f.Name] = fileFromFileInfo(f)
  276. }
  277. m.recomputeGlobal()
  278. m.recomputeNeed()
  279. }
  280. // ConnectedTo returns true if we are connected to the named node.
  281. func (m *Model) ConnectedTo(nodeID string) bool {
  282. m.RLock()
  283. defer m.RUnlock()
  284. _, ok := m.nodes[nodeID]
  285. return ok
  286. }
  287. // ProtocolIndex returns the current local index in protocol data types.
  288. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  289. m.RLock()
  290. defer m.RUnlock()
  291. return m.protocolIndex()
  292. }
  293. // RepoID returns a unique ID representing the current repository location.
  294. func (m *Model) RepoID() string {
  295. return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
  296. }
  297. // AddConnection adds a new peer connection to the model. An initial index will
  298. // be sent to the connected peer, thereafter index updates whenever the local
  299. // repository changes.
  300. func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) {
  301. node := protocol.NewConnection(nodeID, conn, conn, m)
  302. m.Lock()
  303. m.nodes[nodeID] = node
  304. m.rawConn[nodeID] = conn
  305. m.Unlock()
  306. m.RLock()
  307. idx := m.protocolIndex()
  308. m.RUnlock()
  309. go func() {
  310. node.Index(idx)
  311. }()
  312. }
  313. // protocolIndex returns the current local index in protocol data types.
  314. // Must be called with the read lock held.
  315. func (m *Model) protocolIndex() []protocol.FileInfo {
  316. var index []protocol.FileInfo
  317. for _, f := range m.local {
  318. mf := fileInfoFromFile(f)
  319. if m.trace["idx"] {
  320. var flagComment string
  321. if mf.Flags&FlagDeleted != 0 {
  322. flagComment = " (deleted)"
  323. }
  324. log.Printf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
  325. }
  326. index = append(index, mf)
  327. }
  328. return index
  329. }
  330. func (m *Model) requestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  331. m.RLock()
  332. nc, ok := m.nodes[nodeID]
  333. m.RUnlock()
  334. if !ok {
  335. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  336. }
  337. if m.trace["net"] {
  338. log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  339. }
  340. return nc.Request(name, offset, size, hash)
  341. }
  342. func (m *Model) broadcastIndexLoop() {
  343. for {
  344. m.RLock()
  345. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  346. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  347. m.RUnlock()
  348. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  349. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  350. m.Lock()
  351. var indexWg sync.WaitGroup
  352. indexWg.Add(len(m.nodes))
  353. idx := m.protocolIndex()
  354. m.lastIdxBcast = time.Now()
  355. for _, node := range m.nodes {
  356. node := node
  357. if m.trace["net"] {
  358. log.Printf("NET IDX(out/loop): %s: %d files", node.ID, len(idx))
  359. }
  360. go func() {
  361. node.Index(idx)
  362. indexWg.Done()
  363. }()
  364. }
  365. m.Unlock()
  366. indexWg.Wait()
  367. }
  368. time.Sleep(idxBcastHoldtime)
  369. }
  370. }
  371. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  372. // Must be called with the write lock held.
  373. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  374. // For every file in the existing local table, check if they are also
  375. // present in the new local table. If they are not, check that we already
  376. // had the newest version available according to the global table and if so
  377. // note the file as having been deleted.
  378. var updated bool
  379. for n, f := range m.local {
  380. if _, ok := newLocal[n]; !ok {
  381. if gf := m.global[n]; gf.Modified <= f.Modified {
  382. if f.Flags&FlagDeleted == 0 {
  383. f.Flags = FlagDeleted
  384. f.Modified = f.Modified + 1
  385. f.Blocks = nil
  386. updated = true
  387. }
  388. newLocal[n] = f
  389. }
  390. }
  391. }
  392. return updated
  393. }
  394. func (m *Model) updateLocal(f File) {
  395. if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
  396. m.local[f.Name] = f
  397. m.recomputeGlobal()
  398. m.recomputeNeed()
  399. m.updatedLocal = time.Now().Unix()
  400. m.lastIdxBcastRequest = time.Now()
  401. }
  402. }
  403. // Must be called with the write lock held.
  404. func (m *Model) recomputeGlobal() {
  405. var newGlobal = make(map[string]File)
  406. for n, f := range m.local {
  407. newGlobal[n] = f
  408. }
  409. for _, fs := range m.remote {
  410. for n, f := range fs {
  411. if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified {
  412. newGlobal[n] = f
  413. }
  414. }
  415. }
  416. // Figure out if anything actually changed
  417. var updated bool
  418. if len(newGlobal) != len(m.global) {
  419. updated = true
  420. } else {
  421. for n, f0 := range newGlobal {
  422. if f1, ok := m.global[n]; !ok || f0.Modified != f1.Modified {
  423. updated = true
  424. break
  425. }
  426. }
  427. }
  428. if updated {
  429. m.updateGlobal = time.Now().Unix()
  430. m.global = newGlobal
  431. }
  432. }
  433. // Must be called with the write lock held.
  434. func (m *Model) recomputeNeed() {
  435. m.need = make(map[string]bool)
  436. for n, f := range m.global {
  437. hf, ok := m.local[n]
  438. if !ok || f.Modified > hf.Modified {
  439. if f.Flags&FlagDeleted != 0 && !m.delete {
  440. // Don't want to delete files, so forget this need
  441. continue
  442. }
  443. if f.Flags&FlagDeleted != 0 && !ok {
  444. // Don't have the file, so don't need to delete it
  445. continue
  446. }
  447. if m.trace["need"] {
  448. log.Println("NEED:", ok, hf, f)
  449. }
  450. m.need[n] = true
  451. }
  452. }
  453. }
  454. // Must be called with the read lock held.
  455. func (m *Model) whoHas(name string) []string {
  456. var remote []string
  457. gf := m.global[name]
  458. for node, files := range m.remote {
  459. if file, ok := files[name]; ok && file.Modified == gf.Modified {
  460. remote = append(remote, node)
  461. }
  462. }
  463. return remote
  464. }
  465. func fileFromFileInfo(f protocol.FileInfo) File {
  466. var blocks []Block
  467. var offset uint64
  468. for _, b := range f.Blocks {
  469. blocks = append(blocks, Block{
  470. Offset: offset,
  471. Length: b.Length,
  472. Hash: b.Hash,
  473. })
  474. offset += uint64(b.Length)
  475. }
  476. return File{
  477. Name: f.Name,
  478. Flags: f.Flags,
  479. Modified: int64(f.Modified),
  480. Blocks: blocks,
  481. }
  482. }
  483. func fileInfoFromFile(f File) protocol.FileInfo {
  484. var blocks []protocol.BlockInfo
  485. for _, b := range f.Blocks {
  486. blocks = append(blocks, protocol.BlockInfo{
  487. Length: b.Length,
  488. Hash: b.Hash,
  489. })
  490. }
  491. return protocol.FileInfo{
  492. Name: f.Name,
  493. Flags: f.Flags,
  494. Modified: int64(f.Modified),
  495. Blocks: blocks,
  496. }
  497. }