model.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. package model
  2. import (
  3. "crypto/sha1"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "net"
  9. "os"
  10. "path"
  11. "sync"
  12. "time"
  13. "github.com/calmh/syncthing/buffers"
  14. "github.com/calmh/syncthing/protocol"
  15. )
  16. type Model struct {
  17. dir string
  18. global map[string]File // the latest version of each file as it exists in the cluster
  19. gmut sync.RWMutex // protects global
  20. local map[string]File // the files we currently have locally on disk
  21. lmut sync.RWMutex // protects local
  22. remote map[string]map[string]File
  23. rmut sync.RWMutex // protects remote
  24. protoConn map[string]Connection
  25. rawConn map[string]io.Closer
  26. pmut sync.RWMutex // protects protoConn and rawConn
  27. // Queue for files to fetch. fq can call back into the model, so we must ensure
  28. // to hold no locks when calling methods on fq.
  29. fq *FileQueue
  30. dq chan File // queue for files to delete
  31. updatedLocal int64 // timestamp of last update to local
  32. updateGlobal int64 // timestamp of last update to remote
  33. lastIdxBcast time.Time
  34. lastIdxBcastRequest time.Time
  35. umut sync.RWMutex // provides updated* and lastIdx*
  36. rwRunning bool
  37. delete bool
  38. initmut sync.Mutex // protects rwRunning and delete
  39. trace map[string]bool
  40. fileLastChanged map[string]time.Time
  41. fileWasSuppressed map[string]int
  42. fmut sync.Mutex // protects fileLastChanged and fileWasSuppressed
  43. parallellRequests int
  44. limitRequestRate chan struct{}
  45. imut sync.Mutex // protects Index
  46. }
  47. type Connection interface {
  48. ID() string
  49. Index([]protocol.FileInfo)
  50. Request(name string, offset int64, size uint32, hash []byte) ([]byte, error)
  51. Statistics() protocol.Statistics
  52. }
  53. const (
  54. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  55. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  56. minFileHoldTimeS = 60 // Never allow file changes more often than this
  57. maxFileHoldTimeS = 600 // Always allow file changes at least this often
  58. )
  59. var (
  60. ErrNoSuchFile = errors.New("no such file")
  61. ErrInvalid = errors.New("file is invalid")
  62. )
  63. // NewModel creates and starts a new model. The model starts in read-only mode,
  64. // where it sends index information to connected peers and responds to requests
  65. // for file data without altering the local repository in any way.
  66. func NewModel(dir string) *Model {
  67. m := &Model{
  68. dir: dir,
  69. global: make(map[string]File),
  70. local: make(map[string]File),
  71. remote: make(map[string]map[string]File),
  72. protoConn: make(map[string]Connection),
  73. rawConn: make(map[string]io.Closer),
  74. lastIdxBcast: time.Now(),
  75. trace: make(map[string]bool),
  76. fileLastChanged: make(map[string]time.Time),
  77. fileWasSuppressed: make(map[string]int),
  78. fq: NewFileQueue(),
  79. dq: make(chan File),
  80. }
  81. go m.broadcastIndexLoop()
  82. return m
  83. }
  84. func (m *Model) LimitRate(kbps int) {
  85. m.limitRequestRate = make(chan struct{}, kbps)
  86. n := kbps/10 + 1
  87. go func() {
  88. for {
  89. time.Sleep(100 * time.Millisecond)
  90. for i := 0; i < n; i++ {
  91. select {
  92. case m.limitRequestRate <- struct{}{}:
  93. }
  94. }
  95. }
  96. }()
  97. }
  98. // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
  99. func (m *Model) Trace(t string) {
  100. m.trace[t] = true
  101. }
  102. // StartRW starts read/write processing on the current model. When in
  103. // read/write mode the model will attempt to keep in sync with the cluster by
  104. // pulling needed files from peer nodes.
  105. func (m *Model) StartRW(del bool, threads int) {
  106. m.initmut.Lock()
  107. defer m.initmut.Unlock()
  108. if m.rwRunning {
  109. panic("starting started model")
  110. }
  111. m.rwRunning = true
  112. m.delete = del
  113. m.parallellRequests = threads
  114. go m.cleanTempFiles()
  115. if del {
  116. go m.deleteLoop()
  117. }
  118. }
  119. // Generation returns an opaque integer that is guaranteed to increment on
  120. // every change to the local repository or global model.
  121. func (m *Model) Generation() int64 {
  122. m.umut.RLock()
  123. defer m.umut.RUnlock()
  124. return m.updatedLocal + m.updateGlobal
  125. }
  126. func (m *Model) LocalAge() float64 {
  127. m.umut.RLock()
  128. defer m.umut.RUnlock()
  129. return time.Since(time.Unix(m.updatedLocal, 0)).Seconds()
  130. }
  131. type ConnectionInfo struct {
  132. protocol.Statistics
  133. Address string
  134. }
  135. // ConnectionStats returns a map with connection statistics for each connected node.
  136. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  137. type remoteAddrer interface {
  138. RemoteAddr() net.Addr
  139. }
  140. m.pmut.RLock()
  141. var res = make(map[string]ConnectionInfo)
  142. for node, conn := range m.protoConn {
  143. ci := ConnectionInfo{
  144. Statistics: conn.Statistics(),
  145. }
  146. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  147. ci.Address = nc.RemoteAddr().String()
  148. }
  149. res[node] = ci
  150. }
  151. m.pmut.RUnlock()
  152. return res
  153. }
  154. // LocalSize returns the number of files, deleted files and total bytes for all
  155. // files in the global model.
  156. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  157. m.gmut.RLock()
  158. for _, f := range m.global {
  159. if f.Flags&protocol.FlagDeleted == 0 {
  160. files++
  161. bytes += f.Size()
  162. } else {
  163. deleted++
  164. }
  165. }
  166. m.gmut.RUnlock()
  167. return
  168. }
  169. // LocalSize returns the number of files, deleted files and total bytes for all
  170. // files in the local repository.
  171. func (m *Model) LocalSize() (files, deleted, bytes int) {
  172. m.lmut.RLock()
  173. for _, f := range m.local {
  174. if f.Flags&protocol.FlagDeleted == 0 {
  175. files++
  176. bytes += f.Size()
  177. } else {
  178. deleted++
  179. }
  180. }
  181. m.lmut.RUnlock()
  182. return
  183. }
  184. // InSyncSize returns the number and total byte size of the local files that
  185. // are in sync with the global model.
  186. func (m *Model) InSyncSize() (files, bytes int) {
  187. m.gmut.RLock()
  188. m.lmut.RLock()
  189. for n, f := range m.local {
  190. if gf, ok := m.global[n]; ok && f.Equals(gf) {
  191. files++
  192. bytes += f.Size()
  193. }
  194. }
  195. m.lmut.RUnlock()
  196. m.gmut.RUnlock()
  197. return
  198. }
  199. // NeedFiles returns the list of currently needed files and the total size.
  200. func (m *Model) NeedFiles() (files []File, bytes int) {
  201. qf := m.fq.QueuedFiles()
  202. m.gmut.RLock()
  203. for _, n := range qf {
  204. f := m.global[n]
  205. files = append(files, f)
  206. bytes += f.Size()
  207. }
  208. m.gmut.RUnlock()
  209. return
  210. }
  211. // Index is called when a new node is connected and we receive their full index.
  212. // Implements the protocol.Model interface.
  213. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  214. m.imut.Lock()
  215. defer m.imut.Unlock()
  216. if m.trace["net"] {
  217. log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
  218. }
  219. repo := make(map[string]File)
  220. for _, f := range fs {
  221. m.indexUpdate(repo, f)
  222. }
  223. m.rmut.Lock()
  224. m.remote[nodeID] = repo
  225. m.rmut.Unlock()
  226. m.recomputeGlobal()
  227. m.recomputeNeed()
  228. }
  229. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  230. // Implements the protocol.Model interface.
  231. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  232. m.imut.Lock()
  233. defer m.imut.Unlock()
  234. if m.trace["net"] {
  235. log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  236. }
  237. m.rmut.Lock()
  238. repo, ok := m.remote[nodeID]
  239. if !ok {
  240. log.Printf("WARNING: Index update from node %s that does not have an index", nodeID)
  241. m.rmut.Unlock()
  242. return
  243. }
  244. for _, f := range fs {
  245. m.indexUpdate(repo, f)
  246. }
  247. m.rmut.Unlock()
  248. m.recomputeGlobal()
  249. m.recomputeNeed()
  250. }
  251. func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
  252. if m.trace["idx"] {
  253. var flagComment string
  254. if f.Flags&protocol.FlagDeleted != 0 {
  255. flagComment = " (deleted)"
  256. }
  257. log.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  258. }
  259. if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
  260. log.Printf("WARNING: IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f)
  261. return
  262. }
  263. repo[f.Name] = fileFromFileInfo(f)
  264. }
  265. // Close removes the peer from the model and closes the underlying connection if possible.
  266. // Implements the protocol.Model interface.
  267. func (m *Model) Close(node string, err error) {
  268. m.fq.RemoveAvailable(node)
  269. m.pmut.Lock()
  270. m.rmut.Lock()
  271. conn, ok := m.rawConn[node]
  272. if ok {
  273. conn.Close()
  274. }
  275. delete(m.remote, node)
  276. delete(m.protoConn, node)
  277. delete(m.rawConn, node)
  278. m.rmut.Unlock()
  279. m.pmut.Unlock()
  280. m.recomputeGlobal()
  281. m.recomputeNeed()
  282. }
  283. // Request returns the specified data segment by reading it from local disk.
  284. // Implements the protocol.Model interface.
  285. func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  286. // Verify that the requested file exists in the local and global model.
  287. m.lmut.RLock()
  288. lf, localOk := m.local[name]
  289. m.lmut.RUnlock()
  290. m.gmut.RLock()
  291. _, globalOk := m.global[name]
  292. m.gmut.RUnlock()
  293. if !localOk || !globalOk {
  294. log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  295. return nil, ErrNoSuchFile
  296. }
  297. if lf.Flags&protocol.FlagInvalid != 0 {
  298. return nil, ErrInvalid
  299. }
  300. if m.trace["net"] && nodeID != "<local>" {
  301. log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  302. }
  303. fn := path.Join(m.dir, name)
  304. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  305. if err != nil {
  306. return nil, err
  307. }
  308. defer fd.Close()
  309. buf := buffers.Get(int(size))
  310. _, err = fd.ReadAt(buf, offset)
  311. if err != nil {
  312. return nil, err
  313. }
  314. if m.limitRequestRate != nil {
  315. for s := 0; s < len(buf); s += 1024 {
  316. <-m.limitRequestRate
  317. }
  318. }
  319. return buf, nil
  320. }
  321. // ReplaceLocal replaces the local repository index with the given list of files.
  322. // Change suppression is applied to files changing too often.
  323. func (m *Model) ReplaceLocal(fs []File) {
  324. var updated bool
  325. var newLocal = make(map[string]File)
  326. m.lmut.RLock()
  327. for _, f := range fs {
  328. newLocal[f.Name] = f
  329. if ef := m.local[f.Name]; !ef.Equals(f) {
  330. updated = true
  331. }
  332. }
  333. m.lmut.RUnlock()
  334. if m.markDeletedLocals(newLocal) {
  335. updated = true
  336. }
  337. m.lmut.RLock()
  338. if len(newLocal) != len(m.local) {
  339. updated = true
  340. }
  341. m.lmut.RUnlock()
  342. if updated {
  343. m.lmut.Lock()
  344. m.local = newLocal
  345. m.lmut.Unlock()
  346. m.recomputeGlobal()
  347. m.recomputeNeed()
  348. m.umut.Lock()
  349. m.updatedLocal = time.Now().Unix()
  350. m.lastIdxBcastRequest = time.Now()
  351. m.umut.Unlock()
  352. }
  353. }
  354. // SeedLocal replaces the local repository index with the given list of files,
  355. // in protocol data types. Does not track deletes, should only be used to seed
  356. // the local index from a cache file at startup.
  357. func (m *Model) SeedLocal(fs []protocol.FileInfo) {
  358. m.lmut.Lock()
  359. m.local = make(map[string]File)
  360. for _, f := range fs {
  361. m.local[f.Name] = fileFromFileInfo(f)
  362. }
  363. m.lmut.Unlock()
  364. m.recomputeGlobal()
  365. m.recomputeNeed()
  366. }
  367. // ConnectedTo returns true if we are connected to the named node.
  368. func (m *Model) ConnectedTo(nodeID string) bool {
  369. m.pmut.RLock()
  370. _, ok := m.protoConn[nodeID]
  371. m.pmut.RUnlock()
  372. return ok
  373. }
  374. // RepoID returns a unique ID representing the current repository location.
  375. func (m *Model) RepoID() string {
  376. return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
  377. }
  378. // AddConnection adds a new peer connection to the model. An initial index will
  379. // be sent to the connected peer, thereafter index updates whenever the local
  380. // repository changes.
  381. func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
  382. nodeID := protoConn.ID()
  383. m.pmut.Lock()
  384. m.protoConn[nodeID] = protoConn
  385. m.rawConn[nodeID] = rawConn
  386. m.pmut.Unlock()
  387. go func() {
  388. idx := m.ProtocolIndex()
  389. protoConn.Index(idx)
  390. }()
  391. m.initmut.Lock()
  392. rw := m.rwRunning
  393. m.initmut.Unlock()
  394. if !rw {
  395. return
  396. }
  397. for i := 0; i < m.parallellRequests; i++ {
  398. i := i
  399. go func() {
  400. if m.trace["pull"] {
  401. log.Println("PULL: Starting", nodeID, i)
  402. }
  403. for {
  404. m.pmut.RLock()
  405. if _, ok := m.protoConn[nodeID]; !ok {
  406. if m.trace["pull"] {
  407. log.Println("PULL: Exiting", nodeID, i)
  408. }
  409. m.pmut.RUnlock()
  410. return
  411. }
  412. m.pmut.RUnlock()
  413. qb, ok := m.fq.Get(nodeID)
  414. if ok {
  415. if m.trace["pull"] {
  416. log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
  417. }
  418. data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
  419. m.fq.Done(qb.name, qb.block.Offset, data)
  420. } else {
  421. time.Sleep(1 * time.Second)
  422. }
  423. }
  424. }()
  425. }
  426. }
  427. func (m *Model) shouldSuppressChange(name string) bool {
  428. m.fmut.Lock()
  429. sup := shouldSuppressChange(m.fileLastChanged[name], m.fileWasSuppressed[name])
  430. if sup {
  431. m.fileWasSuppressed[name]++
  432. } else {
  433. m.fileWasSuppressed[name] = 0
  434. m.fileLastChanged[name] = time.Now()
  435. }
  436. m.fmut.Unlock()
  437. return sup
  438. }
  439. func shouldSuppressChange(lastChange time.Time, numChanges int) bool {
  440. sinceLast := time.Since(lastChange)
  441. if sinceLast > maxFileHoldTimeS*time.Second {
  442. return false
  443. }
  444. if sinceLast < time.Duration((numChanges+2)*minFileHoldTimeS)*time.Second {
  445. return true
  446. }
  447. return false
  448. }
  449. // ProtocolIndex returns the current local index in protocol data types.
  450. // Must be called with the read lock held.
  451. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  452. var index []protocol.FileInfo
  453. m.lmut.RLock()
  454. for _, f := range m.local {
  455. mf := fileInfoFromFile(f)
  456. if m.trace["idx"] {
  457. var flagComment string
  458. if mf.Flags&protocol.FlagDeleted != 0 {
  459. flagComment = " (deleted)"
  460. }
  461. log.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  462. }
  463. index = append(index, mf)
  464. }
  465. m.lmut.RUnlock()
  466. return index
  467. }
  468. func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  469. m.pmut.RLock()
  470. nc, ok := m.protoConn[nodeID]
  471. m.pmut.RUnlock()
  472. if !ok {
  473. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  474. }
  475. if m.trace["net"] {
  476. log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  477. }
  478. return nc.Request(name, offset, size, hash)
  479. }
  480. func (m *Model) broadcastIndexLoop() {
  481. for {
  482. m.umut.RLock()
  483. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  484. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  485. m.umut.RUnlock()
  486. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  487. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  488. idx := m.ProtocolIndex()
  489. var indexWg sync.WaitGroup
  490. indexWg.Add(len(m.protoConn))
  491. m.umut.Lock()
  492. m.lastIdxBcast = time.Now()
  493. m.umut.Unlock()
  494. m.pmut.RLock()
  495. for _, node := range m.protoConn {
  496. node := node
  497. if m.trace["net"] {
  498. log.Printf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
  499. }
  500. go func() {
  501. node.Index(idx)
  502. indexWg.Done()
  503. }()
  504. }
  505. m.pmut.RUnlock()
  506. indexWg.Wait()
  507. }
  508. time.Sleep(idxBcastHoldtime)
  509. }
  510. }
  511. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  512. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  513. // For every file in the existing local table, check if they are also
  514. // present in the new local table. If they are not, check that we already
  515. // had the newest version available according to the global table and if so
  516. // note the file as having been deleted.
  517. var updated bool
  518. m.gmut.RLock()
  519. m.lmut.RLock()
  520. for n, f := range m.local {
  521. if _, ok := newLocal[n]; !ok {
  522. if gf := m.global[n]; !gf.NewerThan(f) {
  523. if f.Flags&protocol.FlagDeleted == 0 {
  524. f.Flags = protocol.FlagDeleted
  525. f.Version++
  526. f.Blocks = nil
  527. updated = true
  528. }
  529. newLocal[n] = f
  530. }
  531. }
  532. }
  533. m.lmut.RUnlock()
  534. m.gmut.RUnlock()
  535. return updated
  536. }
  537. func (m *Model) updateLocal(f File) {
  538. var updated bool
  539. m.lmut.Lock()
  540. if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
  541. m.local[f.Name] = f
  542. updated = true
  543. }
  544. m.lmut.Unlock()
  545. if updated {
  546. m.recomputeGlobal()
  547. // We don't recomputeNeed here for two reasons:
  548. // - a need shouldn't have arisen due to having a newer local file
  549. // - recomputeNeed might call into fq.Add but we might have been called by
  550. // fq which would be a deadlock on fq
  551. m.umut.Lock()
  552. m.updatedLocal = time.Now().Unix()
  553. m.lastIdxBcastRequest = time.Now()
  554. m.umut.Unlock()
  555. }
  556. }
  557. func (m *Model) recomputeGlobal() {
  558. var newGlobal = make(map[string]File)
  559. m.lmut.RLock()
  560. for n, f := range m.local {
  561. newGlobal[n] = f
  562. }
  563. m.lmut.RUnlock()
  564. var available = make(map[string][]string)
  565. m.rmut.RLock()
  566. var highestMod int64
  567. for nodeID, fs := range m.remote {
  568. for n, nf := range fs {
  569. if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
  570. newGlobal[n] = nf
  571. available[n] = []string{nodeID}
  572. if nf.Modified > highestMod {
  573. highestMod = nf.Modified
  574. }
  575. } else if lf.Equals(nf) {
  576. available[n] = append(available[n], nodeID)
  577. }
  578. }
  579. }
  580. m.rmut.RUnlock()
  581. for f, ns := range available {
  582. m.fq.SetAvailable(f, ns)
  583. }
  584. // Figure out if anything actually changed
  585. m.gmut.RLock()
  586. var updated bool
  587. if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
  588. updated = true
  589. } else {
  590. for n, f0 := range newGlobal {
  591. if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
  592. updated = true
  593. break
  594. }
  595. }
  596. }
  597. m.gmut.RUnlock()
  598. if updated {
  599. m.gmut.Lock()
  600. m.umut.Lock()
  601. m.global = newGlobal
  602. m.updateGlobal = time.Now().Unix()
  603. m.umut.Unlock()
  604. m.gmut.Unlock()
  605. }
  606. }
  607. func (m *Model) recomputeNeed() {
  608. type addOrder struct {
  609. n string
  610. remote []Block
  611. fm *fileMonitor
  612. }
  613. var toDelete []File
  614. var toAdd []addOrder
  615. m.gmut.RLock()
  616. for n, gf := range m.global {
  617. m.lmut.RLock()
  618. lf, ok := m.local[n]
  619. m.lmut.RUnlock()
  620. if !ok || gf.NewerThan(lf) {
  621. if gf.Flags&protocol.FlagInvalid != 0 {
  622. // Never attempt to sync invalid files
  623. continue
  624. }
  625. if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
  626. // Don't want to delete files, so forget this need
  627. continue
  628. }
  629. if gf.Flags&protocol.FlagDeleted != 0 && !ok {
  630. // Don't have the file, so don't need to delete it
  631. continue
  632. }
  633. if m.trace["need"] {
  634. log.Printf("NEED: lf:%v gf:%v", lf, gf)
  635. }
  636. if gf.Flags&protocol.FlagDeleted != 0 {
  637. toDelete = append(toDelete, gf)
  638. } else {
  639. local, remote := BlockDiff(lf.Blocks, gf.Blocks)
  640. fm := fileMonitor{
  641. name: n,
  642. path: path.Clean(path.Join(m.dir, n)),
  643. global: gf,
  644. model: m,
  645. localBlocks: local,
  646. }
  647. toAdd = append(toAdd, addOrder{n, remote, &fm})
  648. }
  649. }
  650. }
  651. m.gmut.RUnlock()
  652. for _, ao := range toAdd {
  653. if !m.fq.Queued(ao.n) {
  654. m.fq.Add(ao.n, ao.remote, ao.fm)
  655. }
  656. }
  657. for _, gf := range toDelete {
  658. m.dq <- gf
  659. }
  660. }
  661. func (m *Model) WhoHas(name string) []string {
  662. var remote []string
  663. m.gmut.RLock()
  664. m.rmut.RLock()
  665. gf := m.global[name]
  666. for node, files := range m.remote {
  667. if file, ok := files[name]; ok && file.Equals(gf) {
  668. remote = append(remote, node)
  669. }
  670. }
  671. m.rmut.RUnlock()
  672. m.gmut.RUnlock()
  673. return remote
  674. }
  675. func (m *Model) deleteLoop() {
  676. for file := range m.dq {
  677. if m.trace["file"] {
  678. log.Println("FILE: Delete", file.Name)
  679. }
  680. path := path.Clean(path.Join(m.dir, file.Name))
  681. err := os.Remove(path)
  682. if err != nil {
  683. log.Printf("WARNING: %s: %v", file.Name, err)
  684. }
  685. m.updateLocal(file)
  686. }
  687. }
  688. func fileFromFileInfo(f protocol.FileInfo) File {
  689. var blocks = make([]Block, len(f.Blocks))
  690. var offset int64
  691. for i, b := range f.Blocks {
  692. blocks[i] = Block{
  693. Offset: offset,
  694. Size: b.Size,
  695. Hash: b.Hash,
  696. }
  697. offset += int64(b.Size)
  698. }
  699. return File{
  700. Name: f.Name,
  701. Flags: f.Flags,
  702. Modified: f.Modified,
  703. Version: f.Version,
  704. Blocks: blocks,
  705. }
  706. }
  707. func fileInfoFromFile(f File) protocol.FileInfo {
  708. var blocks = make([]protocol.BlockInfo, len(f.Blocks))
  709. for i, b := range f.Blocks {
  710. blocks[i] = protocol.BlockInfo{
  711. Size: b.Size,
  712. Hash: b.Hash,
  713. }
  714. }
  715. return protocol.FileInfo{
  716. Name: f.Name,
  717. Flags: f.Flags,
  718. Modified: f.Modified,
  719. Version: f.Version,
  720. Blocks: blocks,
  721. }
  722. }