model.go 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. package main
  2. /*
  3. Locking
  4. =======
  5. The model has read and write locks. These must be acquired as appropriate by
  6. public methods. To prevent deadlock situations, private methods should never
  7. acquire locks, but document what locks they require.
  8. TODO(jb): Keep global and per node transfer and performance statistics.
  9. */
  10. import (
  11. "fmt"
  12. "os"
  13. "path"
  14. "sync"
  15. "time"
  16. "github.com/calmh/syncthing/buffers"
  17. "github.com/calmh/syncthing/protocol"
  18. )
  19. type Model struct {
  20. sync.RWMutex
  21. dir string
  22. updated int64
  23. global map[string]File // the latest version of each file as it exists in the cluster
  24. local map[string]File // the files we currently have locally on disk
  25. remote map[string]map[string]File
  26. need map[string]bool // the files we need to update
  27. nodes map[string]*protocol.Connection
  28. }
  29. const (
  30. RemoteFetchers = 4
  31. FlagDeleted = 1 << 12
  32. )
  33. func NewModel(dir string) *Model {
  34. m := &Model{
  35. dir: dir,
  36. global: make(map[string]File),
  37. local: make(map[string]File),
  38. remote: make(map[string]map[string]File),
  39. need: make(map[string]bool),
  40. nodes: make(map[string]*protocol.Connection),
  41. }
  42. go m.printStats()
  43. return m
  44. }
  45. func (m *Model) Start() {
  46. go m.puller()
  47. }
  48. func (m *Model) printStats() {
  49. for {
  50. time.Sleep(60 * time.Second)
  51. m.RLock()
  52. for node, conn := range m.nodes {
  53. stats := conn.Statistics()
  54. if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 {
  55. infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000)
  56. } else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 {
  57. infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec))
  58. }
  59. }
  60. m.RUnlock()
  61. }
  62. }
  63. func toSI(n int) string {
  64. if n > 1<<30 {
  65. return fmt.Sprintf("%.02f G", float64(n)/(1<<30))
  66. }
  67. if n > 1<<20 {
  68. return fmt.Sprintf("%.02f M", float64(n)/(1<<20))
  69. }
  70. if n > 1<<10 {
  71. return fmt.Sprintf("%.01f K", float64(n)/(1<<10))
  72. }
  73. return fmt.Sprintf("%d ", n)
  74. }
  75. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  76. m.Lock()
  77. defer m.Unlock()
  78. if opts.Debug.TraceNet {
  79. debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
  80. }
  81. m.remote[nodeID] = make(map[string]File)
  82. for _, f := range fs {
  83. if f.Flags&FlagDeleted != 0 && !opts.Delete {
  84. // Files marked as deleted do not even enter the model
  85. continue
  86. }
  87. mf := File{
  88. Name: f.Name,
  89. Flags: f.Flags,
  90. Modified: int64(f.Modified),
  91. }
  92. var offset uint64
  93. for _, b := range f.Blocks {
  94. mf.Blocks = append(mf.Blocks, Block{
  95. Offset: offset,
  96. Length: b.Length,
  97. Hash: b.Hash,
  98. })
  99. offset += uint64(b.Length)
  100. }
  101. m.remote[nodeID][f.Name] = mf
  102. }
  103. m.recomputeGlobal()
  104. m.recomputeNeed()
  105. }
  106. func (m *Model) SeedIndex(fs []protocol.FileInfo) {
  107. m.Lock()
  108. defer m.Unlock()
  109. m.local = make(map[string]File)
  110. for _, f := range fs {
  111. mf := File{
  112. Name: f.Name,
  113. Flags: f.Flags,
  114. Modified: int64(f.Modified),
  115. }
  116. var offset uint64
  117. for _, b := range f.Blocks {
  118. mf.Blocks = append(mf.Blocks, Block{
  119. Offset: offset,
  120. Length: b.Length,
  121. Hash: b.Hash,
  122. })
  123. offset += uint64(b.Length)
  124. }
  125. m.local[f.Name] = mf
  126. }
  127. m.recomputeGlobal()
  128. m.recomputeNeed()
  129. }
  130. func (m *Model) Close(node string) {
  131. m.Lock()
  132. defer m.Unlock()
  133. if opts.Debug.TraceNet {
  134. debugf("NET CLOSE: %s", node)
  135. }
  136. delete(m.remote, node)
  137. delete(m.nodes, node)
  138. m.recomputeGlobal()
  139. m.recomputeNeed()
  140. }
  141. func (m *Model) Request(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  142. if opts.Debug.TraceNet && nodeID != "<local>" {
  143. debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  144. }
  145. fn := path.Join(m.dir, name)
  146. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  147. if err != nil {
  148. return nil, err
  149. }
  150. defer fd.Close()
  151. buf := buffers.Get(int(size))
  152. _, err = fd.ReadAt(buf, int64(offset))
  153. if err != nil {
  154. return nil, err
  155. }
  156. return buf, nil
  157. }
  158. func (m *Model) RequestGlobal(nodeID, name string, offset uint64, size uint32, hash []byte) ([]byte, error) {
  159. m.RLock()
  160. nc := m.nodes[nodeID]
  161. m.RUnlock()
  162. if opts.Debug.TraceNet {
  163. debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  164. }
  165. return nc.Request(name, offset, size, hash)
  166. }
  167. func (m *Model) ReplaceLocal(fs []File) {
  168. m.Lock()
  169. defer m.Unlock()
  170. var updated bool
  171. var newLocal = make(map[string]File)
  172. for _, f := range fs {
  173. newLocal[f.Name] = f
  174. if ef := m.local[f.Name]; ef.Modified != f.Modified {
  175. updated = true
  176. }
  177. }
  178. if m.markDeletedLocals(newLocal) {
  179. updated = true
  180. }
  181. if len(newLocal) != len(m.local) {
  182. updated = true
  183. }
  184. if updated {
  185. m.local = newLocal
  186. m.recomputeGlobal()
  187. m.recomputeNeed()
  188. m.updated = time.Now().Unix()
  189. m.broadcastIndex()
  190. }
  191. }
  192. // Must be called with the read lock held.
  193. func (m *Model) broadcastIndex() {
  194. idx := m.protocolIndex()
  195. for _, node := range m.nodes {
  196. node := node
  197. if opts.Debug.TraceNet {
  198. debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
  199. }
  200. go node.Index(idx)
  201. }
  202. }
  203. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  204. // Must be called with the write lock held.
  205. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  206. // For every file in the existing local table, check if they are also
  207. // present in the new local table. If they are not, check that we already
  208. // had the newest version available according to the global table and if so
  209. // note the file as having been deleted.
  210. var updated bool
  211. for n, f := range m.local {
  212. if _, ok := newLocal[n]; !ok {
  213. if gf := m.global[n]; gf.Modified <= f.Modified {
  214. if f.Flags&FlagDeleted == 0 {
  215. f.Flags = FlagDeleted
  216. f.Modified = f.Modified + 1
  217. f.Blocks = nil
  218. updated = true
  219. }
  220. newLocal[n] = f
  221. }
  222. }
  223. }
  224. return updated
  225. }
  226. func (m *Model) UpdateLocal(f File) {
  227. m.Lock()
  228. defer m.Unlock()
  229. if ef, ok := m.local[f.Name]; !ok || ef.Modified != f.Modified {
  230. m.local[f.Name] = f
  231. m.recomputeGlobal()
  232. m.recomputeNeed()
  233. m.updated = time.Now().Unix()
  234. m.broadcastIndex()
  235. }
  236. }
  237. func (m *Model) Dir() string {
  238. m.RLock()
  239. defer m.RUnlock()
  240. return m.dir
  241. }
  242. func (m *Model) HaveFiles() []File {
  243. m.RLock()
  244. defer m.RUnlock()
  245. var files []File
  246. for _, file := range m.local {
  247. files = append(files, file)
  248. }
  249. return files
  250. }
  251. func (m *Model) LocalFile(name string) (File, bool) {
  252. m.RLock()
  253. defer m.RUnlock()
  254. f, ok := m.local[name]
  255. return f, ok
  256. }
  257. func (m *Model) GlobalFile(name string) (File, bool) {
  258. m.RLock()
  259. defer m.RUnlock()
  260. f, ok := m.global[name]
  261. return f, ok
  262. }
  263. // Must be called with the write lock held.
  264. func (m *Model) recomputeGlobal() {
  265. var newGlobal = make(map[string]File)
  266. for n, f := range m.local {
  267. newGlobal[n] = f
  268. }
  269. for _, fs := range m.remote {
  270. for n, f := range fs {
  271. if cf, ok := newGlobal[n]; !ok || cf.Modified < f.Modified {
  272. newGlobal[n] = f
  273. }
  274. }
  275. }
  276. m.global = newGlobal
  277. }
  278. // Must be called with the write lock held.
  279. func (m *Model) recomputeNeed() {
  280. m.need = make(map[string]bool)
  281. for n, f := range m.global {
  282. hf, ok := m.local[n]
  283. if !ok || f.Modified > hf.Modified {
  284. m.need[n] = true
  285. }
  286. }
  287. }
  288. // Must be called with the read lock held.
  289. func (m *Model) whoHas(name string) []string {
  290. var remote []string
  291. gf := m.global[name]
  292. for node, files := range m.remote {
  293. if file, ok := files[name]; ok && file.Modified == gf.Modified {
  294. remote = append(remote, node)
  295. }
  296. }
  297. return remote
  298. }
  299. func (m *Model) ConnectedTo(nodeID string) bool {
  300. m.RLock()
  301. defer m.RUnlock()
  302. _, ok := m.nodes[nodeID]
  303. return ok
  304. }
  305. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  306. m.RLock()
  307. defer m.RUnlock()
  308. return m.protocolIndex()
  309. }
  310. // Must be called with the read lock held.
  311. func (m *Model) protocolIndex() []protocol.FileInfo {
  312. var index []protocol.FileInfo
  313. for _, f := range m.local {
  314. mf := protocol.FileInfo{
  315. Name: f.Name,
  316. Flags: f.Flags,
  317. Modified: int64(f.Modified),
  318. }
  319. for _, b := range f.Blocks {
  320. mf.Blocks = append(mf.Blocks, protocol.BlockInfo{
  321. Length: b.Length,
  322. Hash: b.Hash,
  323. })
  324. }
  325. if opts.Debug.TraceIdx {
  326. var flagComment string
  327. if mf.Flags&FlagDeleted != 0 {
  328. flagComment = " (deleted)"
  329. }
  330. debugf("IDX: %q m=%d f=%o%s (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, len(mf.Blocks))
  331. }
  332. index = append(index, mf)
  333. }
  334. return index
  335. }
  336. func (m *Model) AddNode(node *protocol.Connection) {
  337. m.Lock()
  338. m.nodes[node.ID] = node
  339. m.Unlock()
  340. m.RLock()
  341. idx := m.protocolIndex()
  342. m.RUnlock()
  343. if opts.Debug.TraceNet {
  344. debugf("NET IDX(out): %s: %d files", node.ID, len(idx))
  345. }
  346. // Sending the index might take a while if we have many files and a slow
  347. // uplink. Return from AddNode in the meantime.
  348. go node.Index(idx)
  349. }