model.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916
  1. package main
  2. import (
  3. "crypto/sha1"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "net"
  8. "os"
  9. "path"
  10. "sync"
  11. "time"
  12. "github.com/calmh/syncthing/buffers"
  13. "github.com/calmh/syncthing/protocol"
  14. )
  15. type Model struct {
  16. dir string
  17. global map[string]File // the latest version of each file as it exists in the cluster
  18. gmut sync.RWMutex // protects global
  19. local map[string]File // the files we currently have locally on disk
  20. lmut sync.RWMutex // protects local
  21. remote map[string]map[string]File
  22. rmut sync.RWMutex // protects remote
  23. protoConn map[string]Connection
  24. rawConn map[string]io.Closer
  25. pmut sync.RWMutex // protects protoConn and rawConn
  26. // Queue for files to fetch. fq can call back into the model, so we must ensure
  27. // to hold no locks when calling methods on fq.
  28. fq *FileQueue
  29. dq chan File // queue for files to delete
  30. updatedLocal int64 // timestamp of last update to local
  31. updateGlobal int64 // timestamp of last update to remote
  32. lastIdxBcast time.Time
  33. lastIdxBcastRequest time.Time
  34. umut sync.RWMutex // provides updated* and lastIdx*
  35. rwRunning bool
  36. delete bool
  37. initmut sync.Mutex // protects rwRunning and delete
  38. trace map[string]bool
  39. sup suppressor
  40. parallelRequests int
  41. limitRequestRate chan struct{}
  42. imut sync.Mutex // protects Index
  43. }
  44. type Connection interface {
  45. ID() string
  46. Index(string, []protocol.FileInfo)
  47. Request(repo, name string, offset int64, size uint32, hash []byte) ([]byte, error)
  48. Statistics() protocol.Statistics
  49. Option(key string) string
  50. }
  51. const (
  52. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  53. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  54. minFileHoldTimeS = 60 // Never allow file changes more often than this
  55. maxFileHoldTimeS = 600 // Always allow file changes at least this often
  56. )
  57. var (
  58. ErrNoSuchFile = errors.New("no such file")
  59. ErrInvalid = errors.New("file is invalid")
  60. )
  61. // NewModel creates and starts a new model. The model starts in read-only mode,
  62. // where it sends index information to connected peers and responds to requests
  63. // for file data without altering the local repository in any way.
  64. func NewModel(dir string, maxChangeBw int) *Model {
  65. m := &Model{
  66. dir: dir,
  67. global: make(map[string]File),
  68. local: make(map[string]File),
  69. remote: make(map[string]map[string]File),
  70. protoConn: make(map[string]Connection),
  71. rawConn: make(map[string]io.Closer),
  72. lastIdxBcast: time.Now(),
  73. trace: make(map[string]bool),
  74. sup: suppressor{threshold: int64(maxChangeBw)},
  75. fq: NewFileQueue(),
  76. dq: make(chan File),
  77. }
  78. go m.broadcastIndexLoop()
  79. return m
  80. }
  81. func (m *Model) LimitRate(kbps int) {
  82. m.limitRequestRate = make(chan struct{}, kbps)
  83. n := kbps/10 + 1
  84. go func() {
  85. for {
  86. time.Sleep(100 * time.Millisecond)
  87. for i := 0; i < n; i++ {
  88. select {
  89. case m.limitRequestRate <- struct{}{}:
  90. }
  91. }
  92. }
  93. }()
  94. }
  95. // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
  96. func (m *Model) Trace(t string) {
  97. m.trace[t] = true
  98. }
  99. // StartRW starts read/write processing on the current model. When in
  100. // read/write mode the model will attempt to keep in sync with the cluster by
  101. // pulling needed files from peer nodes.
  102. func (m *Model) StartRW(del bool, threads int) {
  103. m.initmut.Lock()
  104. defer m.initmut.Unlock()
  105. if m.rwRunning {
  106. panic("starting started model")
  107. }
  108. m.rwRunning = true
  109. m.delete = del
  110. m.parallelRequests = threads
  111. go m.cleanTempFiles()
  112. if del {
  113. go m.deleteLoop()
  114. }
  115. }
  116. // Generation returns an opaque integer that is guaranteed to increment on
  117. // every change to the local repository or global model.
  118. func (m *Model) Generation() int64 {
  119. m.umut.RLock()
  120. defer m.umut.RUnlock()
  121. return m.updatedLocal + m.updateGlobal
  122. }
  123. func (m *Model) LocalAge() float64 {
  124. m.umut.RLock()
  125. defer m.umut.RUnlock()
  126. return time.Since(time.Unix(m.updatedLocal, 0)).Seconds()
  127. }
  128. type ConnectionInfo struct {
  129. protocol.Statistics
  130. Address string
  131. ClientID string
  132. ClientVersion string
  133. Completion int
  134. }
  135. // ConnectionStats returns a map with connection statistics for each connected node.
  136. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  137. type remoteAddrer interface {
  138. RemoteAddr() net.Addr
  139. }
  140. m.gmut.RLock()
  141. m.pmut.RLock()
  142. m.rmut.RLock()
  143. var tot int
  144. for _, f := range m.global {
  145. tot += f.Size()
  146. }
  147. var res = make(map[string]ConnectionInfo)
  148. for node, conn := range m.protoConn {
  149. ci := ConnectionInfo{
  150. Statistics: conn.Statistics(),
  151. ClientID: conn.Option("clientId"),
  152. ClientVersion: conn.Option("clientVersion"),
  153. }
  154. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  155. ci.Address = nc.RemoteAddr().String()
  156. }
  157. var have int
  158. for _, f := range m.remote[node] {
  159. if f.Equals(m.global[f.Name]) {
  160. have += f.Size()
  161. }
  162. }
  163. ci.Completion = 100 * have / tot
  164. res[node] = ci
  165. }
  166. m.rmut.RUnlock()
  167. m.pmut.RUnlock()
  168. m.gmut.RUnlock()
  169. return res
  170. }
  171. // LocalSize returns the number of files, deleted files and total bytes for all
  172. // files in the global model.
  173. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  174. m.gmut.RLock()
  175. for _, f := range m.global {
  176. if f.Flags&protocol.FlagDeleted == 0 {
  177. files++
  178. bytes += f.Size()
  179. } else {
  180. deleted++
  181. }
  182. }
  183. m.gmut.RUnlock()
  184. return
  185. }
  186. // LocalSize returns the number of files, deleted files and total bytes for all
  187. // files in the local repository.
  188. func (m *Model) LocalSize() (files, deleted, bytes int) {
  189. m.lmut.RLock()
  190. for _, f := range m.local {
  191. if f.Flags&protocol.FlagDeleted == 0 {
  192. files++
  193. bytes += f.Size()
  194. } else {
  195. deleted++
  196. }
  197. }
  198. m.lmut.RUnlock()
  199. return
  200. }
  201. // InSyncSize returns the number and total byte size of the local files that
  202. // are in sync with the global model.
  203. func (m *Model) InSyncSize() (files, bytes int) {
  204. m.gmut.RLock()
  205. m.lmut.RLock()
  206. for n, f := range m.local {
  207. if gf, ok := m.global[n]; ok && f.Equals(gf) {
  208. files++
  209. bytes += f.Size()
  210. }
  211. }
  212. m.lmut.RUnlock()
  213. m.gmut.RUnlock()
  214. return
  215. }
  216. // NeedFiles returns the list of currently needed files and the total size.
  217. func (m *Model) NeedFiles() (files []File, bytes int) {
  218. qf := m.fq.QueuedFiles()
  219. m.gmut.RLock()
  220. for _, n := range qf {
  221. f := m.global[n]
  222. files = append(files, f)
  223. bytes += f.Size()
  224. }
  225. m.gmut.RUnlock()
  226. return
  227. }
  228. // Index is called when a new node is connected and we receive their full index.
  229. // Implements the protocol.Model interface.
  230. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  231. var files = make([]File, len(fs))
  232. for i := range fs {
  233. files[i] = fileFromFileInfo(fs[i])
  234. }
  235. m.imut.Lock()
  236. defer m.imut.Unlock()
  237. if m.trace["net"] {
  238. debugf("NET IDX(in): %s: %d files", nodeID, len(fs))
  239. }
  240. repo := make(map[string]File)
  241. for _, f := range files {
  242. m.indexUpdate(repo, f)
  243. }
  244. m.rmut.Lock()
  245. m.remote[nodeID] = repo
  246. m.rmut.Unlock()
  247. m.recomputeGlobal()
  248. m.recomputeNeedForFiles(files)
  249. }
  250. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  251. // Implements the protocol.Model interface.
  252. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  253. var files = make([]File, len(fs))
  254. for i := range fs {
  255. files[i] = fileFromFileInfo(fs[i])
  256. }
  257. m.imut.Lock()
  258. defer m.imut.Unlock()
  259. if m.trace["net"] {
  260. debugf("NET IDXUP(in): %s: %d files", nodeID, len(files))
  261. }
  262. m.rmut.Lock()
  263. repo, ok := m.remote[nodeID]
  264. if !ok {
  265. warnf("Index update from node %s that does not have an index", nodeID)
  266. m.rmut.Unlock()
  267. return
  268. }
  269. for _, f := range files {
  270. m.indexUpdate(repo, f)
  271. }
  272. m.rmut.Unlock()
  273. m.recomputeGlobal()
  274. m.recomputeNeedForFiles(files)
  275. }
  276. func (m *Model) indexUpdate(repo map[string]File, f File) {
  277. if m.trace["idx"] {
  278. var flagComment string
  279. if f.Flags&protocol.FlagDeleted != 0 {
  280. flagComment = " (deleted)"
  281. }
  282. debugf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  283. }
  284. if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
  285. warnf("IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f)
  286. return
  287. }
  288. repo[f.Name] = f
  289. }
  290. // Close removes the peer from the model and closes the underlying connection if possible.
  291. // Implements the protocol.Model interface.
  292. func (m *Model) Close(node string, err error) {
  293. if m.trace["net"] {
  294. debugf("NET: %s: %v", node, err)
  295. }
  296. if err == protocol.ErrClusterHash {
  297. warnf("Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node)
  298. } else if err != io.EOF {
  299. warnf("Connection to %s closed: %v", node, err)
  300. }
  301. m.fq.RemoveAvailable(node)
  302. m.pmut.Lock()
  303. m.rmut.Lock()
  304. conn, ok := m.rawConn[node]
  305. if ok {
  306. conn.Close()
  307. }
  308. delete(m.remote, node)
  309. delete(m.protoConn, node)
  310. delete(m.rawConn, node)
  311. m.rmut.Unlock()
  312. m.pmut.Unlock()
  313. m.recomputeGlobal()
  314. m.recomputeNeedForGlobal()
  315. }
  316. // Request returns the specified data segment by reading it from local disk.
  317. // Implements the protocol.Model interface.
  318. func (m *Model) Request(nodeID, repo, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  319. // Verify that the requested file exists in the local and global model.
  320. m.lmut.RLock()
  321. lf, localOk := m.local[name]
  322. m.lmut.RUnlock()
  323. m.gmut.RLock()
  324. _, globalOk := m.global[name]
  325. m.gmut.RUnlock()
  326. if !localOk || !globalOk {
  327. warnf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  328. return nil, ErrNoSuchFile
  329. }
  330. if lf.Flags&protocol.FlagInvalid != 0 {
  331. return nil, ErrInvalid
  332. }
  333. if m.trace["net"] && nodeID != "<local>" {
  334. debugf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  335. }
  336. fn := path.Join(m.dir, name)
  337. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  338. if err != nil {
  339. return nil, err
  340. }
  341. defer fd.Close()
  342. buf := buffers.Get(int(size))
  343. _, err = fd.ReadAt(buf, offset)
  344. if err != nil {
  345. return nil, err
  346. }
  347. if m.limitRequestRate != nil {
  348. for s := 0; s < len(buf); s += 1024 {
  349. <-m.limitRequestRate
  350. }
  351. }
  352. return buf, nil
  353. }
  354. // ReplaceLocal replaces the local repository index with the given list of files.
  355. func (m *Model) ReplaceLocal(fs []File) {
  356. var updated bool
  357. var newLocal = make(map[string]File)
  358. m.lmut.RLock()
  359. for _, f := range fs {
  360. newLocal[f.Name] = f
  361. if ef := m.local[f.Name]; !ef.Equals(f) {
  362. updated = true
  363. }
  364. }
  365. m.lmut.RUnlock()
  366. if m.markDeletedLocals(newLocal) {
  367. updated = true
  368. }
  369. m.lmut.RLock()
  370. if len(newLocal) != len(m.local) {
  371. updated = true
  372. }
  373. m.lmut.RUnlock()
  374. if updated {
  375. m.lmut.Lock()
  376. m.local = newLocal
  377. m.lmut.Unlock()
  378. m.recomputeGlobal()
  379. m.recomputeNeedForGlobal()
  380. m.umut.Lock()
  381. m.updatedLocal = time.Now().Unix()
  382. m.lastIdxBcastRequest = time.Now()
  383. m.umut.Unlock()
  384. }
  385. }
  386. // SeedLocal replaces the local repository index with the given list of files,
  387. // in protocol data types. Does not track deletes, should only be used to seed
  388. // the local index from a cache file at startup.
  389. func (m *Model) SeedLocal(fs []protocol.FileInfo) {
  390. m.lmut.Lock()
  391. m.local = make(map[string]File)
  392. for _, f := range fs {
  393. m.local[f.Name] = fileFromFileInfo(f)
  394. }
  395. m.lmut.Unlock()
  396. m.recomputeGlobal()
  397. m.recomputeNeedForGlobal()
  398. }
  399. // ConnectedTo returns true if we are connected to the named node.
  400. func (m *Model) ConnectedTo(nodeID string) bool {
  401. m.pmut.RLock()
  402. _, ok := m.protoConn[nodeID]
  403. m.pmut.RUnlock()
  404. return ok
  405. }
  406. // RepoID returns a unique ID representing the current repository location.
  407. func (m *Model) RepoID() string {
  408. return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
  409. }
  410. // AddConnection adds a new peer connection to the model. An initial index will
  411. // be sent to the connected peer, thereafter index updates whenever the local
  412. // repository changes.
  413. func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
  414. nodeID := protoConn.ID()
  415. m.pmut.Lock()
  416. m.protoConn[nodeID] = protoConn
  417. m.rawConn[nodeID] = rawConn
  418. m.pmut.Unlock()
  419. go func() {
  420. idx := m.ProtocolIndex()
  421. protoConn.Index("default", idx)
  422. }()
  423. m.initmut.Lock()
  424. rw := m.rwRunning
  425. m.initmut.Unlock()
  426. if !rw {
  427. return
  428. }
  429. for i := 0; i < m.parallelRequests; i++ {
  430. i := i
  431. go func() {
  432. if m.trace["pull"] {
  433. debugln("PULL: Starting", nodeID, i)
  434. }
  435. for {
  436. m.pmut.RLock()
  437. if _, ok := m.protoConn[nodeID]; !ok {
  438. if m.trace["pull"] {
  439. debugln("PULL: Exiting", nodeID, i)
  440. }
  441. m.pmut.RUnlock()
  442. return
  443. }
  444. m.pmut.RUnlock()
  445. qb, ok := m.fq.Get(nodeID)
  446. if ok {
  447. if m.trace["pull"] {
  448. debugln("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
  449. }
  450. data, _ := protoConn.Request("default", qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
  451. m.fq.Done(qb.name, qb.block.Offset, data)
  452. } else {
  453. time.Sleep(1 * time.Second)
  454. }
  455. }
  456. }()
  457. }
  458. }
  459. // ProtocolIndex returns the current local index in protocol data types.
  460. // Must be called with the read lock held.
  461. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  462. var index []protocol.FileInfo
  463. m.lmut.RLock()
  464. for _, f := range m.local {
  465. mf := fileInfoFromFile(f)
  466. if m.trace["idx"] {
  467. var flagComment string
  468. if mf.Flags&protocol.FlagDeleted != 0 {
  469. flagComment = " (deleted)"
  470. }
  471. debugf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  472. }
  473. index = append(index, mf)
  474. }
  475. m.lmut.RUnlock()
  476. return index
  477. }
  478. func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  479. m.pmut.RLock()
  480. nc, ok := m.protoConn[nodeID]
  481. m.pmut.RUnlock()
  482. if !ok {
  483. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  484. }
  485. if m.trace["net"] {
  486. debugf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  487. }
  488. return nc.Request("default", name, offset, size, hash)
  489. }
  490. func (m *Model) broadcastIndexLoop() {
  491. for {
  492. m.umut.RLock()
  493. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  494. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  495. m.umut.RUnlock()
  496. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  497. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  498. idx := m.ProtocolIndex()
  499. var indexWg sync.WaitGroup
  500. indexWg.Add(len(m.protoConn))
  501. m.umut.Lock()
  502. m.lastIdxBcast = time.Now()
  503. m.umut.Unlock()
  504. m.pmut.RLock()
  505. for _, node := range m.protoConn {
  506. node := node
  507. if m.trace["net"] {
  508. debugf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
  509. }
  510. go func() {
  511. node.Index("default", idx)
  512. indexWg.Done()
  513. }()
  514. }
  515. m.pmut.RUnlock()
  516. indexWg.Wait()
  517. }
  518. time.Sleep(idxBcastHoldtime)
  519. }
  520. }
  521. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  522. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  523. // For every file in the existing local table, check if they are also
  524. // present in the new local table. If they are not, check that we already
  525. // had the newest version available according to the global table and if so
  526. // note the file as having been deleted.
  527. var updated bool
  528. m.gmut.RLock()
  529. m.lmut.RLock()
  530. for n, f := range m.local {
  531. if _, ok := newLocal[n]; !ok {
  532. if gf := m.global[n]; !gf.NewerThan(f) {
  533. if f.Flags&protocol.FlagDeleted == 0 {
  534. f.Flags = protocol.FlagDeleted
  535. f.Version++
  536. f.Blocks = nil
  537. updated = true
  538. }
  539. newLocal[n] = f
  540. }
  541. }
  542. }
  543. m.lmut.RUnlock()
  544. m.gmut.RUnlock()
  545. return updated
  546. }
  547. func (m *Model) updateLocal(f File) {
  548. var updated bool
  549. m.lmut.Lock()
  550. if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
  551. m.local[f.Name] = f
  552. updated = true
  553. }
  554. m.lmut.Unlock()
  555. if updated {
  556. m.recomputeGlobal()
  557. // We don't recomputeNeed here for two reasons:
  558. // - a need shouldn't have arisen due to having a newer local file
  559. // - recomputeNeed might call into fq.Add but we might have been called by
  560. // fq which would be a deadlock on fq
  561. m.umut.Lock()
  562. m.updatedLocal = time.Now().Unix()
  563. m.lastIdxBcastRequest = time.Now()
  564. m.umut.Unlock()
  565. }
  566. }
  567. /*
  568. XXX: Not done, needs elegant handling of availability
  569. func (m *Model) recomputeGlobalFor(files []File) bool {
  570. m.gmut.Lock()
  571. defer m.gmut.Unlock()
  572. var updated bool
  573. for _, f := range files {
  574. if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) {
  575. m.global[f.Name] = f
  576. updated = true
  577. // Fix availability
  578. }
  579. }
  580. return updated
  581. }
  582. */
  583. func (m *Model) recomputeGlobal() {
  584. var newGlobal = make(map[string]File)
  585. m.lmut.RLock()
  586. for n, f := range m.local {
  587. newGlobal[n] = f
  588. }
  589. m.lmut.RUnlock()
  590. var available = make(map[string][]string)
  591. m.rmut.RLock()
  592. var highestMod int64
  593. for nodeID, fs := range m.remote {
  594. for n, nf := range fs {
  595. if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
  596. newGlobal[n] = nf
  597. available[n] = []string{nodeID}
  598. if nf.Modified > highestMod {
  599. highestMod = nf.Modified
  600. }
  601. } else if lf.Equals(nf) {
  602. available[n] = append(available[n], nodeID)
  603. }
  604. }
  605. }
  606. m.rmut.RUnlock()
  607. for f, ns := range available {
  608. m.fq.SetAvailable(f, ns)
  609. }
  610. // Figure out if anything actually changed
  611. m.gmut.RLock()
  612. var updated bool
  613. if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
  614. updated = true
  615. } else {
  616. for n, f0 := range newGlobal {
  617. if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
  618. updated = true
  619. break
  620. }
  621. }
  622. }
  623. m.gmut.RUnlock()
  624. if updated {
  625. m.gmut.Lock()
  626. m.umut.Lock()
  627. m.global = newGlobal
  628. m.updateGlobal = time.Now().Unix()
  629. m.umut.Unlock()
  630. m.gmut.Unlock()
  631. }
  632. }
  633. type addOrder struct {
  634. n string
  635. remote []Block
  636. fm *fileMonitor
  637. }
  638. func (m *Model) recomputeNeedForGlobal() {
  639. var toDelete []File
  640. var toAdd []addOrder
  641. m.gmut.RLock()
  642. for _, gf := range m.global {
  643. toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
  644. }
  645. m.gmut.RUnlock()
  646. for _, ao := range toAdd {
  647. m.fq.Add(ao.n, ao.remote, ao.fm)
  648. }
  649. for _, gf := range toDelete {
  650. m.dq <- gf
  651. }
  652. }
  653. func (m *Model) recomputeNeedForFiles(files []File) {
  654. var toDelete []File
  655. var toAdd []addOrder
  656. m.gmut.RLock()
  657. for _, gf := range files {
  658. toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
  659. }
  660. m.gmut.RUnlock()
  661. for _, ao := range toAdd {
  662. m.fq.Add(ao.n, ao.remote, ao.fm)
  663. }
  664. for _, gf := range toDelete {
  665. m.dq <- gf
  666. }
  667. }
  668. func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) {
  669. m.lmut.RLock()
  670. lf, ok := m.local[gf.Name]
  671. m.lmut.RUnlock()
  672. if !ok || gf.NewerThan(lf) {
  673. if gf.Flags&protocol.FlagInvalid != 0 {
  674. // Never attempt to sync invalid files
  675. return toAdd, toDelete
  676. }
  677. if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
  678. // Don't want to delete files, so forget this need
  679. return toAdd, toDelete
  680. }
  681. if gf.Flags&protocol.FlagDeleted != 0 && !ok {
  682. // Don't have the file, so don't need to delete it
  683. return toAdd, toDelete
  684. }
  685. if m.trace["need"] {
  686. debugf("NEED: lf:%v gf:%v", lf, gf)
  687. }
  688. if gf.Flags&protocol.FlagDeleted != 0 {
  689. toDelete = append(toDelete, gf)
  690. } else {
  691. local, remote := BlockDiff(lf.Blocks, gf.Blocks)
  692. fm := fileMonitor{
  693. name: gf.Name,
  694. path: path.Clean(path.Join(m.dir, gf.Name)),
  695. global: gf,
  696. model: m,
  697. localBlocks: local,
  698. }
  699. toAdd = append(toAdd, addOrder{gf.Name, remote, &fm})
  700. }
  701. }
  702. return toAdd, toDelete
  703. }
  704. func (m *Model) WhoHas(name string) []string {
  705. var remote []string
  706. m.gmut.RLock()
  707. m.rmut.RLock()
  708. gf := m.global[name]
  709. for node, files := range m.remote {
  710. if file, ok := files[name]; ok && file.Equals(gf) {
  711. remote = append(remote, node)
  712. }
  713. }
  714. m.rmut.RUnlock()
  715. m.gmut.RUnlock()
  716. return remote
  717. }
  718. func (m *Model) deleteLoop() {
  719. for file := range m.dq {
  720. if m.trace["file"] {
  721. debugln("FILE: Delete", file.Name)
  722. }
  723. path := path.Clean(path.Join(m.dir, file.Name))
  724. err := os.Remove(path)
  725. if err != nil {
  726. warnf("%s: %v", file.Name, err)
  727. }
  728. m.updateLocal(file)
  729. }
  730. }
  731. func fileFromFileInfo(f protocol.FileInfo) File {
  732. var blocks = make([]Block, len(f.Blocks))
  733. var offset int64
  734. for i, b := range f.Blocks {
  735. blocks[i] = Block{
  736. Offset: offset,
  737. Size: b.Size,
  738. Hash: b.Hash,
  739. }
  740. offset += int64(b.Size)
  741. }
  742. return File{
  743. Name: f.Name,
  744. Flags: f.Flags,
  745. Modified: f.Modified,
  746. Version: f.Version,
  747. Blocks: blocks,
  748. }
  749. }
  750. func fileInfoFromFile(f File) protocol.FileInfo {
  751. var blocks = make([]protocol.BlockInfo, len(f.Blocks))
  752. for i, b := range f.Blocks {
  753. blocks[i] = protocol.BlockInfo{
  754. Size: b.Size,
  755. Hash: b.Hash,
  756. }
  757. }
  758. return protocol.FileInfo{
  759. Name: f.Name,
  760. Flags: f.Flags,
  761. Modified: f.Modified,
  762. Version: f.Version,
  763. Blocks: blocks,
  764. }
  765. }