model.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830
  1. package model
  2. import (
  3. "crypto/sha1"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "net"
  9. "os"
  10. "path"
  11. "sync"
  12. "time"
  13. "github.com/calmh/syncthing/buffers"
  14. "github.com/calmh/syncthing/protocol"
  15. )
  16. type Model struct {
  17. dir string
  18. global map[string]File // the latest version of each file as it exists in the cluster
  19. gmut sync.RWMutex // protects global
  20. local map[string]File // the files we currently have locally on disk
  21. lmut sync.RWMutex // protects local
  22. remote map[string]map[string]File
  23. rmut sync.RWMutex // protects remote
  24. protoConn map[string]Connection
  25. rawConn map[string]io.Closer
  26. pmut sync.RWMutex // protects protoConn and rawConn
  27. // Queue for files to fetch. fq can call back into the model, so we must ensure
  28. // to hold no locks when calling methods on fq.
  29. fq *FileQueue
  30. dq chan File // queue for files to delete
  31. updatedLocal int64 // timestamp of last update to local
  32. updateGlobal int64 // timestamp of last update to remote
  33. lastIdxBcast time.Time
  34. lastIdxBcastRequest time.Time
  35. umut sync.RWMutex // provides updated* and lastIdx*
  36. rwRunning bool
  37. delete bool
  38. initmut sync.Mutex // protects rwRunning and delete
  39. trace map[string]bool
  40. sup suppressor
  41. parallellRequests int
  42. limitRequestRate chan struct{}
  43. imut sync.Mutex // protects Index
  44. }
  45. type Connection interface {
  46. ID() string
  47. Index([]protocol.FileInfo)
  48. Request(name string, offset int64, size uint32, hash []byte) ([]byte, error)
  49. Statistics() protocol.Statistics
  50. }
  51. const (
  52. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  53. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  54. minFileHoldTimeS = 60 // Never allow file changes more often than this
  55. maxFileHoldTimeS = 600 // Always allow file changes at least this often
  56. )
  57. var (
  58. ErrNoSuchFile = errors.New("no such file")
  59. ErrInvalid = errors.New("file is invalid")
  60. )
  61. // NewModel creates and starts a new model. The model starts in read-only mode,
  62. // where it sends index information to connected peers and responds to requests
  63. // for file data without altering the local repository in any way.
  64. func NewModel(dir string, maxChangeBw int) *Model {
  65. m := &Model{
  66. dir: dir,
  67. global: make(map[string]File),
  68. local: make(map[string]File),
  69. remote: make(map[string]map[string]File),
  70. protoConn: make(map[string]Connection),
  71. rawConn: make(map[string]io.Closer),
  72. lastIdxBcast: time.Now(),
  73. trace: make(map[string]bool),
  74. sup: suppressor{threshold: int64(maxChangeBw)},
  75. fq: NewFileQueue(),
  76. dq: make(chan File),
  77. }
  78. go m.broadcastIndexLoop()
  79. return m
  80. }
  81. func (m *Model) LimitRate(kbps int) {
  82. m.limitRequestRate = make(chan struct{}, kbps)
  83. n := kbps/10 + 1
  84. go func() {
  85. for {
  86. time.Sleep(100 * time.Millisecond)
  87. for i := 0; i < n; i++ {
  88. select {
  89. case m.limitRequestRate <- struct{}{}:
  90. }
  91. }
  92. }
  93. }()
  94. }
  95. // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
  96. func (m *Model) Trace(t string) {
  97. m.trace[t] = true
  98. }
  99. // StartRW starts read/write processing on the current model. When in
  100. // read/write mode the model will attempt to keep in sync with the cluster by
  101. // pulling needed files from peer nodes.
  102. func (m *Model) StartRW(del bool, threads int) {
  103. m.initmut.Lock()
  104. defer m.initmut.Unlock()
  105. if m.rwRunning {
  106. panic("starting started model")
  107. }
  108. m.rwRunning = true
  109. m.delete = del
  110. m.parallellRequests = threads
  111. go m.cleanTempFiles()
  112. if del {
  113. go m.deleteLoop()
  114. }
  115. }
  116. // Generation returns an opaque integer that is guaranteed to increment on
  117. // every change to the local repository or global model.
  118. func (m *Model) Generation() int64 {
  119. m.umut.RLock()
  120. defer m.umut.RUnlock()
  121. return m.updatedLocal + m.updateGlobal
  122. }
  123. func (m *Model) LocalAge() float64 {
  124. m.umut.RLock()
  125. defer m.umut.RUnlock()
  126. return time.Since(time.Unix(m.updatedLocal, 0)).Seconds()
  127. }
  128. type ConnectionInfo struct {
  129. protocol.Statistics
  130. Address string
  131. }
  132. // ConnectionStats returns a map with connection statistics for each connected node.
  133. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  134. type remoteAddrer interface {
  135. RemoteAddr() net.Addr
  136. }
  137. m.pmut.RLock()
  138. var res = make(map[string]ConnectionInfo)
  139. for node, conn := range m.protoConn {
  140. ci := ConnectionInfo{
  141. Statistics: conn.Statistics(),
  142. }
  143. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  144. ci.Address = nc.RemoteAddr().String()
  145. }
  146. res[node] = ci
  147. }
  148. m.pmut.RUnlock()
  149. return res
  150. }
  151. // LocalSize returns the number of files, deleted files and total bytes for all
  152. // files in the global model.
  153. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  154. m.gmut.RLock()
  155. for _, f := range m.global {
  156. if f.Flags&protocol.FlagDeleted == 0 {
  157. files++
  158. bytes += f.Size()
  159. } else {
  160. deleted++
  161. }
  162. }
  163. m.gmut.RUnlock()
  164. return
  165. }
  166. // LocalSize returns the number of files, deleted files and total bytes for all
  167. // files in the local repository.
  168. func (m *Model) LocalSize() (files, deleted, bytes int) {
  169. m.lmut.RLock()
  170. for _, f := range m.local {
  171. if f.Flags&protocol.FlagDeleted == 0 {
  172. files++
  173. bytes += f.Size()
  174. } else {
  175. deleted++
  176. }
  177. }
  178. m.lmut.RUnlock()
  179. return
  180. }
  181. // InSyncSize returns the number and total byte size of the local files that
  182. // are in sync with the global model.
  183. func (m *Model) InSyncSize() (files, bytes int) {
  184. m.gmut.RLock()
  185. m.lmut.RLock()
  186. for n, f := range m.local {
  187. if gf, ok := m.global[n]; ok && f.Equals(gf) {
  188. files++
  189. bytes += f.Size()
  190. }
  191. }
  192. m.lmut.RUnlock()
  193. m.gmut.RUnlock()
  194. return
  195. }
  196. // NeedFiles returns the list of currently needed files and the total size.
  197. func (m *Model) NeedFiles() (files []File, bytes int) {
  198. qf := m.fq.QueuedFiles()
  199. m.gmut.RLock()
  200. for _, n := range qf {
  201. f := m.global[n]
  202. files = append(files, f)
  203. bytes += f.Size()
  204. }
  205. m.gmut.RUnlock()
  206. return
  207. }
  208. // Index is called when a new node is connected and we receive their full index.
  209. // Implements the protocol.Model interface.
  210. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  211. m.imut.Lock()
  212. defer m.imut.Unlock()
  213. if m.trace["net"] {
  214. log.Printf("NET IDX(in): %s: %d files", nodeID, len(fs))
  215. }
  216. repo := make(map[string]File)
  217. for _, f := range fs {
  218. m.indexUpdate(repo, f)
  219. }
  220. m.rmut.Lock()
  221. m.remote[nodeID] = repo
  222. m.rmut.Unlock()
  223. m.recomputeGlobal()
  224. m.recomputeNeed()
  225. }
  226. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  227. // Implements the protocol.Model interface.
  228. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  229. m.imut.Lock()
  230. defer m.imut.Unlock()
  231. if m.trace["net"] {
  232. log.Printf("NET IDXUP(in): %s: %d files", nodeID, len(fs))
  233. }
  234. m.rmut.Lock()
  235. repo, ok := m.remote[nodeID]
  236. if !ok {
  237. log.Printf("WARNING: Index update from node %s that does not have an index", nodeID)
  238. m.rmut.Unlock()
  239. return
  240. }
  241. for _, f := range fs {
  242. m.indexUpdate(repo, f)
  243. }
  244. m.rmut.Unlock()
  245. m.recomputeGlobal()
  246. m.recomputeNeed()
  247. }
  248. func (m *Model) indexUpdate(repo map[string]File, f protocol.FileInfo) {
  249. if m.trace["idx"] {
  250. var flagComment string
  251. if f.Flags&protocol.FlagDeleted != 0 {
  252. flagComment = " (deleted)"
  253. }
  254. log.Printf("IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  255. }
  256. if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
  257. log.Printf("WARNING: IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f)
  258. return
  259. }
  260. repo[f.Name] = fileFromFileInfo(f)
  261. }
  262. // Close removes the peer from the model and closes the underlying connection if possible.
  263. // Implements the protocol.Model interface.
  264. func (m *Model) Close(node string, err error) {
  265. m.fq.RemoveAvailable(node)
  266. m.pmut.Lock()
  267. m.rmut.Lock()
  268. conn, ok := m.rawConn[node]
  269. if ok {
  270. conn.Close()
  271. }
  272. delete(m.remote, node)
  273. delete(m.protoConn, node)
  274. delete(m.rawConn, node)
  275. m.rmut.Unlock()
  276. m.pmut.Unlock()
  277. m.recomputeGlobal()
  278. m.recomputeNeed()
  279. }
  280. // Request returns the specified data segment by reading it from local disk.
  281. // Implements the protocol.Model interface.
  282. func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  283. // Verify that the requested file exists in the local and global model.
  284. m.lmut.RLock()
  285. lf, localOk := m.local[name]
  286. m.lmut.RUnlock()
  287. m.gmut.RLock()
  288. _, globalOk := m.global[name]
  289. m.gmut.RUnlock()
  290. if !localOk || !globalOk {
  291. log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  292. return nil, ErrNoSuchFile
  293. }
  294. if lf.Flags&protocol.FlagInvalid != 0 {
  295. return nil, ErrInvalid
  296. }
  297. if m.trace["net"] && nodeID != "<local>" {
  298. log.Printf("NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  299. }
  300. fn := path.Join(m.dir, name)
  301. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  302. if err != nil {
  303. return nil, err
  304. }
  305. defer fd.Close()
  306. buf := buffers.Get(int(size))
  307. _, err = fd.ReadAt(buf, offset)
  308. if err != nil {
  309. return nil, err
  310. }
  311. if m.limitRequestRate != nil {
  312. for s := 0; s < len(buf); s += 1024 {
  313. <-m.limitRequestRate
  314. }
  315. }
  316. return buf, nil
  317. }
  318. // ReplaceLocal replaces the local repository index with the given list of files.
  319. func (m *Model) ReplaceLocal(fs []File) {
  320. var updated bool
  321. var newLocal = make(map[string]File)
  322. m.lmut.RLock()
  323. for _, f := range fs {
  324. newLocal[f.Name] = f
  325. if ef := m.local[f.Name]; !ef.Equals(f) {
  326. updated = true
  327. }
  328. }
  329. m.lmut.RUnlock()
  330. if m.markDeletedLocals(newLocal) {
  331. updated = true
  332. }
  333. m.lmut.RLock()
  334. if len(newLocal) != len(m.local) {
  335. updated = true
  336. }
  337. m.lmut.RUnlock()
  338. if updated {
  339. m.lmut.Lock()
  340. m.local = newLocal
  341. m.lmut.Unlock()
  342. m.recomputeGlobal()
  343. m.recomputeNeed()
  344. m.umut.Lock()
  345. m.updatedLocal = time.Now().Unix()
  346. m.lastIdxBcastRequest = time.Now()
  347. m.umut.Unlock()
  348. }
  349. }
  350. // SeedLocal replaces the local repository index with the given list of files,
  351. // in protocol data types. Does not track deletes, should only be used to seed
  352. // the local index from a cache file at startup.
  353. func (m *Model) SeedLocal(fs []protocol.FileInfo) {
  354. m.lmut.Lock()
  355. m.local = make(map[string]File)
  356. for _, f := range fs {
  357. m.local[f.Name] = fileFromFileInfo(f)
  358. }
  359. m.lmut.Unlock()
  360. m.recomputeGlobal()
  361. m.recomputeNeed()
  362. }
  363. // ConnectedTo returns true if we are connected to the named node.
  364. func (m *Model) ConnectedTo(nodeID string) bool {
  365. m.pmut.RLock()
  366. _, ok := m.protoConn[nodeID]
  367. m.pmut.RUnlock()
  368. return ok
  369. }
  370. // RepoID returns a unique ID representing the current repository location.
  371. func (m *Model) RepoID() string {
  372. return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
  373. }
  374. // AddConnection adds a new peer connection to the model. An initial index will
  375. // be sent to the connected peer, thereafter index updates whenever the local
  376. // repository changes.
  377. func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
  378. nodeID := protoConn.ID()
  379. m.pmut.Lock()
  380. m.protoConn[nodeID] = protoConn
  381. m.rawConn[nodeID] = rawConn
  382. m.pmut.Unlock()
  383. go func() {
  384. idx := m.ProtocolIndex()
  385. protoConn.Index(idx)
  386. }()
  387. m.initmut.Lock()
  388. rw := m.rwRunning
  389. m.initmut.Unlock()
  390. if !rw {
  391. return
  392. }
  393. for i := 0; i < m.parallellRequests; i++ {
  394. i := i
  395. go func() {
  396. if m.trace["pull"] {
  397. log.Println("PULL: Starting", nodeID, i)
  398. }
  399. for {
  400. m.pmut.RLock()
  401. if _, ok := m.protoConn[nodeID]; !ok {
  402. if m.trace["pull"] {
  403. log.Println("PULL: Exiting", nodeID, i)
  404. }
  405. m.pmut.RUnlock()
  406. return
  407. }
  408. m.pmut.RUnlock()
  409. qb, ok := m.fq.Get(nodeID)
  410. if ok {
  411. if m.trace["pull"] {
  412. log.Println("PULL: Request", nodeID, i, qb.name, qb.block.Offset)
  413. }
  414. data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
  415. m.fq.Done(qb.name, qb.block.Offset, data)
  416. } else {
  417. time.Sleep(1 * time.Second)
  418. }
  419. }
  420. }()
  421. }
  422. }
  423. // ProtocolIndex returns the current local index in protocol data types.
  424. // Must be called with the read lock held.
  425. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  426. var index []protocol.FileInfo
  427. m.lmut.RLock()
  428. for _, f := range m.local {
  429. mf := fileInfoFromFile(f)
  430. if m.trace["idx"] {
  431. var flagComment string
  432. if mf.Flags&protocol.FlagDeleted != 0 {
  433. flagComment = " (deleted)"
  434. }
  435. log.Printf("IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  436. }
  437. index = append(index, mf)
  438. }
  439. m.lmut.RUnlock()
  440. return index
  441. }
  442. func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  443. m.pmut.RLock()
  444. nc, ok := m.protoConn[nodeID]
  445. m.pmut.RUnlock()
  446. if !ok {
  447. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  448. }
  449. if m.trace["net"] {
  450. log.Printf("NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  451. }
  452. return nc.Request(name, offset, size, hash)
  453. }
  454. func (m *Model) broadcastIndexLoop() {
  455. for {
  456. m.umut.RLock()
  457. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  458. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  459. m.umut.RUnlock()
  460. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  461. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  462. idx := m.ProtocolIndex()
  463. var indexWg sync.WaitGroup
  464. indexWg.Add(len(m.protoConn))
  465. m.umut.Lock()
  466. m.lastIdxBcast = time.Now()
  467. m.umut.Unlock()
  468. m.pmut.RLock()
  469. for _, node := range m.protoConn {
  470. node := node
  471. if m.trace["net"] {
  472. log.Printf("NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
  473. }
  474. go func() {
  475. node.Index(idx)
  476. indexWg.Done()
  477. }()
  478. }
  479. m.pmut.RUnlock()
  480. indexWg.Wait()
  481. }
  482. time.Sleep(idxBcastHoldtime)
  483. }
  484. }
  485. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  486. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  487. // For every file in the existing local table, check if they are also
  488. // present in the new local table. If they are not, check that we already
  489. // had the newest version available according to the global table and if so
  490. // note the file as having been deleted.
  491. var updated bool
  492. m.gmut.RLock()
  493. m.lmut.RLock()
  494. for n, f := range m.local {
  495. if _, ok := newLocal[n]; !ok {
  496. if gf := m.global[n]; !gf.NewerThan(f) {
  497. if f.Flags&protocol.FlagDeleted == 0 {
  498. f.Flags = protocol.FlagDeleted
  499. f.Version++
  500. f.Blocks = nil
  501. updated = true
  502. }
  503. newLocal[n] = f
  504. }
  505. }
  506. }
  507. m.lmut.RUnlock()
  508. m.gmut.RUnlock()
  509. return updated
  510. }
  511. func (m *Model) updateLocal(f File) {
  512. var updated bool
  513. m.lmut.Lock()
  514. if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
  515. m.local[f.Name] = f
  516. updated = true
  517. }
  518. m.lmut.Unlock()
  519. if updated {
  520. m.recomputeGlobal()
  521. // We don't recomputeNeed here for two reasons:
  522. // - a need shouldn't have arisen due to having a newer local file
  523. // - recomputeNeed might call into fq.Add but we might have been called by
  524. // fq which would be a deadlock on fq
  525. m.umut.Lock()
  526. m.updatedLocal = time.Now().Unix()
  527. m.lastIdxBcastRequest = time.Now()
  528. m.umut.Unlock()
  529. }
  530. }
  531. func (m *Model) recomputeGlobal() {
  532. var newGlobal = make(map[string]File)
  533. m.lmut.RLock()
  534. for n, f := range m.local {
  535. newGlobal[n] = f
  536. }
  537. m.lmut.RUnlock()
  538. var available = make(map[string][]string)
  539. m.rmut.RLock()
  540. var highestMod int64
  541. for nodeID, fs := range m.remote {
  542. for n, nf := range fs {
  543. if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
  544. newGlobal[n] = nf
  545. available[n] = []string{nodeID}
  546. if nf.Modified > highestMod {
  547. highestMod = nf.Modified
  548. }
  549. } else if lf.Equals(nf) {
  550. available[n] = append(available[n], nodeID)
  551. }
  552. }
  553. }
  554. m.rmut.RUnlock()
  555. for f, ns := range available {
  556. m.fq.SetAvailable(f, ns)
  557. }
  558. // Figure out if anything actually changed
  559. m.gmut.RLock()
  560. var updated bool
  561. if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
  562. updated = true
  563. } else {
  564. for n, f0 := range newGlobal {
  565. if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
  566. updated = true
  567. break
  568. }
  569. }
  570. }
  571. m.gmut.RUnlock()
  572. if updated {
  573. m.gmut.Lock()
  574. m.umut.Lock()
  575. m.global = newGlobal
  576. m.updateGlobal = time.Now().Unix()
  577. m.umut.Unlock()
  578. m.gmut.Unlock()
  579. }
  580. }
  581. func (m *Model) recomputeNeed() {
  582. type addOrder struct {
  583. n string
  584. remote []Block
  585. fm *fileMonitor
  586. }
  587. var toDelete []File
  588. var toAdd []addOrder
  589. m.gmut.RLock()
  590. for n, gf := range m.global {
  591. m.lmut.RLock()
  592. lf, ok := m.local[n]
  593. m.lmut.RUnlock()
  594. if !ok || gf.NewerThan(lf) {
  595. if gf.Flags&protocol.FlagInvalid != 0 {
  596. // Never attempt to sync invalid files
  597. continue
  598. }
  599. if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
  600. // Don't want to delete files, so forget this need
  601. continue
  602. }
  603. if gf.Flags&protocol.FlagDeleted != 0 && !ok {
  604. // Don't have the file, so don't need to delete it
  605. continue
  606. }
  607. if m.trace["need"] {
  608. log.Printf("NEED: lf:%v gf:%v", lf, gf)
  609. }
  610. if gf.Flags&protocol.FlagDeleted != 0 {
  611. toDelete = append(toDelete, gf)
  612. } else {
  613. local, remote := BlockDiff(lf.Blocks, gf.Blocks)
  614. fm := fileMonitor{
  615. name: n,
  616. path: path.Clean(path.Join(m.dir, n)),
  617. global: gf,
  618. model: m,
  619. localBlocks: local,
  620. }
  621. toAdd = append(toAdd, addOrder{n, remote, &fm})
  622. }
  623. }
  624. }
  625. m.gmut.RUnlock()
  626. for _, ao := range toAdd {
  627. if !m.fq.Queued(ao.n) {
  628. m.fq.Add(ao.n, ao.remote, ao.fm)
  629. }
  630. }
  631. for _, gf := range toDelete {
  632. m.dq <- gf
  633. }
  634. }
  635. func (m *Model) WhoHas(name string) []string {
  636. var remote []string
  637. m.gmut.RLock()
  638. m.rmut.RLock()
  639. gf := m.global[name]
  640. for node, files := range m.remote {
  641. if file, ok := files[name]; ok && file.Equals(gf) {
  642. remote = append(remote, node)
  643. }
  644. }
  645. m.rmut.RUnlock()
  646. m.gmut.RUnlock()
  647. return remote
  648. }
  649. func (m *Model) deleteLoop() {
  650. for file := range m.dq {
  651. if m.trace["file"] {
  652. log.Println("FILE: Delete", file.Name)
  653. }
  654. path := path.Clean(path.Join(m.dir, file.Name))
  655. err := os.Remove(path)
  656. if err != nil {
  657. log.Printf("WARNING: %s: %v", file.Name, err)
  658. }
  659. m.updateLocal(file)
  660. }
  661. }
  662. func fileFromFileInfo(f protocol.FileInfo) File {
  663. var blocks = make([]Block, len(f.Blocks))
  664. var offset int64
  665. for i, b := range f.Blocks {
  666. blocks[i] = Block{
  667. Offset: offset,
  668. Size: b.Size,
  669. Hash: b.Hash,
  670. }
  671. offset += int64(b.Size)
  672. }
  673. return File{
  674. Name: f.Name,
  675. Flags: f.Flags,
  676. Modified: f.Modified,
  677. Version: f.Version,
  678. Blocks: blocks,
  679. }
  680. }
  681. func fileInfoFromFile(f File) protocol.FileInfo {
  682. var blocks = make([]protocol.BlockInfo, len(f.Blocks))
  683. for i, b := range f.Blocks {
  684. blocks[i] = protocol.BlockInfo{
  685. Size: b.Size,
  686. Hash: b.Hash,
  687. }
  688. }
  689. return protocol.FileInfo{
  690. Name: f.Name,
  691. Flags: f.Flags,
  692. Modified: f.Modified,
  693. Version: f.Version,
  694. Blocks: blocks,
  695. }
  696. }