model.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895
  1. package model
  2. import (
  3. "crypto/sha1"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "log"
  8. "net"
  9. "os"
  10. "path"
  11. "sync"
  12. "time"
  13. "github.com/calmh/syncthing/buffers"
  14. "github.com/calmh/syncthing/protocol"
  15. )
  16. type Model struct {
  17. dir string
  18. global map[string]File // the latest version of each file as it exists in the cluster
  19. gmut sync.RWMutex // protects global
  20. local map[string]File // the files we currently have locally on disk
  21. lmut sync.RWMutex // protects local
  22. remote map[string]map[string]File
  23. rmut sync.RWMutex // protects remote
  24. protoConn map[string]Connection
  25. rawConn map[string]io.Closer
  26. pmut sync.RWMutex // protects protoConn and rawConn
  27. // Queue for files to fetch. fq can call back into the model, so we must ensure
  28. // to hold no locks when calling methods on fq.
  29. fq *FileQueue
  30. dq chan File // queue for files to delete
  31. updatedLocal int64 // timestamp of last update to local
  32. updateGlobal int64 // timestamp of last update to remote
  33. lastIdxBcast time.Time
  34. lastIdxBcastRequest time.Time
  35. umut sync.RWMutex // provides updated* and lastIdx*
  36. rwRunning bool
  37. delete bool
  38. initmut sync.Mutex // protects rwRunning and delete
  39. trace map[string]bool
  40. sup suppressor
  41. parallelRequests int
  42. limitRequestRate chan struct{}
  43. imut sync.Mutex // protects Index
  44. }
  45. type Connection interface {
  46. ID() string
  47. Index([]protocol.FileInfo)
  48. Request(name string, offset int64, size uint32, hash []byte) ([]byte, error)
  49. Statistics() protocol.Statistics
  50. Option(key string) string
  51. }
  52. const (
  53. idxBcastHoldtime = 15 * time.Second // Wait at least this long after the last index modification
  54. idxBcastMaxDelay = 120 * time.Second // Unless we've already waited this long
  55. minFileHoldTimeS = 60 // Never allow file changes more often than this
  56. maxFileHoldTimeS = 600 // Always allow file changes at least this often
  57. )
  58. var (
  59. ErrNoSuchFile = errors.New("no such file")
  60. ErrInvalid = errors.New("file is invalid")
  61. )
  62. // NewModel creates and starts a new model. The model starts in read-only mode,
  63. // where it sends index information to connected peers and responds to requests
  64. // for file data without altering the local repository in any way.
  65. func NewModel(dir string, maxChangeBw int) *Model {
  66. m := &Model{
  67. dir: dir,
  68. global: make(map[string]File),
  69. local: make(map[string]File),
  70. remote: make(map[string]map[string]File),
  71. protoConn: make(map[string]Connection),
  72. rawConn: make(map[string]io.Closer),
  73. lastIdxBcast: time.Now(),
  74. trace: make(map[string]bool),
  75. sup: suppressor{threshold: int64(maxChangeBw)},
  76. fq: NewFileQueue(),
  77. dq: make(chan File),
  78. }
  79. go m.broadcastIndexLoop()
  80. return m
  81. }
  82. func (m *Model) LimitRate(kbps int) {
  83. m.limitRequestRate = make(chan struct{}, kbps)
  84. n := kbps/10 + 1
  85. go func() {
  86. for {
  87. time.Sleep(100 * time.Millisecond)
  88. for i := 0; i < n; i++ {
  89. select {
  90. case m.limitRequestRate <- struct{}{}:
  91. }
  92. }
  93. }
  94. }()
  95. }
  96. // Trace enables trace logging of the given facility. This is a debugging function; grep for m.trace.
  97. func (m *Model) Trace(t string) {
  98. m.trace[t] = true
  99. }
  100. // StartRW starts read/write processing on the current model. When in
  101. // read/write mode the model will attempt to keep in sync with the cluster by
  102. // pulling needed files from peer nodes.
  103. func (m *Model) StartRW(del bool, threads int) {
  104. m.initmut.Lock()
  105. defer m.initmut.Unlock()
  106. if m.rwRunning {
  107. panic("starting started model")
  108. }
  109. m.rwRunning = true
  110. m.delete = del
  111. m.parallelRequests = threads
  112. go m.cleanTempFiles()
  113. if del {
  114. go m.deleteLoop()
  115. }
  116. }
  117. // Generation returns an opaque integer that is guaranteed to increment on
  118. // every change to the local repository or global model.
  119. func (m *Model) Generation() int64 {
  120. m.umut.RLock()
  121. defer m.umut.RUnlock()
  122. return m.updatedLocal + m.updateGlobal
  123. }
  124. func (m *Model) LocalAge() float64 {
  125. m.umut.RLock()
  126. defer m.umut.RUnlock()
  127. return time.Since(time.Unix(m.updatedLocal, 0)).Seconds()
  128. }
  129. type ConnectionInfo struct {
  130. protocol.Statistics
  131. Address string
  132. ClientID string
  133. ClientVersion string
  134. }
  135. // ConnectionStats returns a map with connection statistics for each connected node.
  136. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  137. type remoteAddrer interface {
  138. RemoteAddr() net.Addr
  139. }
  140. m.pmut.RLock()
  141. var res = make(map[string]ConnectionInfo)
  142. for node, conn := range m.protoConn {
  143. ci := ConnectionInfo{
  144. Statistics: conn.Statistics(),
  145. ClientID: conn.Option("clientId"),
  146. ClientVersion: conn.Option("clientVersion"),
  147. }
  148. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  149. ci.Address = nc.RemoteAddr().String()
  150. }
  151. res[node] = ci
  152. }
  153. m.pmut.RUnlock()
  154. return res
  155. }
  156. // LocalSize returns the number of files, deleted files and total bytes for all
  157. // files in the global model.
  158. func (m *Model) GlobalSize() (files, deleted, bytes int) {
  159. m.gmut.RLock()
  160. for _, f := range m.global {
  161. if f.Flags&protocol.FlagDeleted == 0 {
  162. files++
  163. bytes += f.Size()
  164. } else {
  165. deleted++
  166. }
  167. }
  168. m.gmut.RUnlock()
  169. return
  170. }
  171. // LocalSize returns the number of files, deleted files and total bytes for all
  172. // files in the local repository.
  173. func (m *Model) LocalSize() (files, deleted, bytes int) {
  174. m.lmut.RLock()
  175. for _, f := range m.local {
  176. if f.Flags&protocol.FlagDeleted == 0 {
  177. files++
  178. bytes += f.Size()
  179. } else {
  180. deleted++
  181. }
  182. }
  183. m.lmut.RUnlock()
  184. return
  185. }
  186. // InSyncSize returns the number and total byte size of the local files that
  187. // are in sync with the global model.
  188. func (m *Model) InSyncSize() (files, bytes int) {
  189. m.gmut.RLock()
  190. m.lmut.RLock()
  191. for n, f := range m.local {
  192. if gf, ok := m.global[n]; ok && f.Equals(gf) {
  193. files++
  194. bytes += f.Size()
  195. }
  196. }
  197. m.lmut.RUnlock()
  198. m.gmut.RUnlock()
  199. return
  200. }
  201. // NeedFiles returns the list of currently needed files and the total size.
  202. func (m *Model) NeedFiles() (files []File, bytes int) {
  203. qf := m.fq.QueuedFiles()
  204. m.gmut.RLock()
  205. for _, n := range qf {
  206. f := m.global[n]
  207. files = append(files, f)
  208. bytes += f.Size()
  209. }
  210. m.gmut.RUnlock()
  211. return
  212. }
  213. // Index is called when a new node is connected and we receive their full index.
  214. // Implements the protocol.Model interface.
  215. func (m *Model) Index(nodeID string, fs []protocol.FileInfo) {
  216. var files = make([]File, len(fs))
  217. for i := range fs {
  218. files[i] = fileFromFileInfo(fs[i])
  219. }
  220. m.imut.Lock()
  221. defer m.imut.Unlock()
  222. if m.trace["net"] {
  223. log.Printf("DEBUG: NET IDX(in): %s: %d files", nodeID, len(fs))
  224. }
  225. repo := make(map[string]File)
  226. for _, f := range files {
  227. m.indexUpdate(repo, f)
  228. }
  229. m.rmut.Lock()
  230. m.remote[nodeID] = repo
  231. m.rmut.Unlock()
  232. m.recomputeGlobal()
  233. m.recomputeNeedForFiles(files)
  234. }
  235. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  236. // Implements the protocol.Model interface.
  237. func (m *Model) IndexUpdate(nodeID string, fs []protocol.FileInfo) {
  238. var files = make([]File, len(fs))
  239. for i := range fs {
  240. files[i] = fileFromFileInfo(fs[i])
  241. }
  242. m.imut.Lock()
  243. defer m.imut.Unlock()
  244. if m.trace["net"] {
  245. log.Printf("DEBUG: NET IDXUP(in): %s: %d files", nodeID, len(files))
  246. }
  247. m.rmut.Lock()
  248. repo, ok := m.remote[nodeID]
  249. if !ok {
  250. log.Printf("WARNING: Index update from node %s that does not have an index", nodeID)
  251. m.rmut.Unlock()
  252. return
  253. }
  254. for _, f := range files {
  255. m.indexUpdate(repo, f)
  256. }
  257. m.rmut.Unlock()
  258. m.recomputeGlobal()
  259. m.recomputeNeedForFiles(files)
  260. }
  261. func (m *Model) indexUpdate(repo map[string]File, f File) {
  262. if m.trace["idx"] {
  263. var flagComment string
  264. if f.Flags&protocol.FlagDeleted != 0 {
  265. flagComment = " (deleted)"
  266. }
  267. log.Printf("DEBUG: IDX(in): %q m=%d f=%o%s v=%d (%d blocks)", f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  268. }
  269. if extraFlags := f.Flags &^ (protocol.FlagInvalid | protocol.FlagDeleted | 0xfff); extraFlags != 0 {
  270. log.Printf("WARNING: IDX(in): Unknown flags 0x%x in index record %+v", extraFlags, f)
  271. return
  272. }
  273. repo[f.Name] = f
  274. }
  275. // Close removes the peer from the model and closes the underlying connection if possible.
  276. // Implements the protocol.Model interface.
  277. func (m *Model) Close(node string, err error) {
  278. if m.trace["net"] {
  279. log.Printf("DEBUG: NET: %s: %v", node, err)
  280. }
  281. if err == protocol.ErrClusterHash {
  282. log.Printf("WARNING: Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node)
  283. }
  284. m.fq.RemoveAvailable(node)
  285. m.pmut.Lock()
  286. m.rmut.Lock()
  287. conn, ok := m.rawConn[node]
  288. if ok {
  289. conn.Close()
  290. }
  291. delete(m.remote, node)
  292. delete(m.protoConn, node)
  293. delete(m.rawConn, node)
  294. m.rmut.Unlock()
  295. m.pmut.Unlock()
  296. m.recomputeGlobal()
  297. m.recomputeNeedForGlobal()
  298. }
  299. // Request returns the specified data segment by reading it from local disk.
  300. // Implements the protocol.Model interface.
  301. func (m *Model) Request(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  302. // Verify that the requested file exists in the local and global model.
  303. m.lmut.RLock()
  304. lf, localOk := m.local[name]
  305. m.lmut.RUnlock()
  306. m.gmut.RLock()
  307. _, globalOk := m.global[name]
  308. m.gmut.RUnlock()
  309. if !localOk || !globalOk {
  310. log.Printf("SECURITY (nonexistent file) REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  311. return nil, ErrNoSuchFile
  312. }
  313. if lf.Flags&protocol.FlagInvalid != 0 {
  314. return nil, ErrInvalid
  315. }
  316. if m.trace["net"] && nodeID != "<local>" {
  317. log.Printf("DEBUG: NET REQ(in): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  318. }
  319. fn := path.Join(m.dir, name)
  320. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  321. if err != nil {
  322. return nil, err
  323. }
  324. defer fd.Close()
  325. buf := buffers.Get(int(size))
  326. _, err = fd.ReadAt(buf, offset)
  327. if err != nil {
  328. return nil, err
  329. }
  330. if m.limitRequestRate != nil {
  331. for s := 0; s < len(buf); s += 1024 {
  332. <-m.limitRequestRate
  333. }
  334. }
  335. return buf, nil
  336. }
  337. // ReplaceLocal replaces the local repository index with the given list of files.
  338. func (m *Model) ReplaceLocal(fs []File) {
  339. var updated bool
  340. var newLocal = make(map[string]File)
  341. m.lmut.RLock()
  342. for _, f := range fs {
  343. newLocal[f.Name] = f
  344. if ef := m.local[f.Name]; !ef.Equals(f) {
  345. updated = true
  346. }
  347. }
  348. m.lmut.RUnlock()
  349. if m.markDeletedLocals(newLocal) {
  350. updated = true
  351. }
  352. m.lmut.RLock()
  353. if len(newLocal) != len(m.local) {
  354. updated = true
  355. }
  356. m.lmut.RUnlock()
  357. if updated {
  358. m.lmut.Lock()
  359. m.local = newLocal
  360. m.lmut.Unlock()
  361. m.recomputeGlobal()
  362. m.recomputeNeedForGlobal()
  363. m.umut.Lock()
  364. m.updatedLocal = time.Now().Unix()
  365. m.lastIdxBcastRequest = time.Now()
  366. m.umut.Unlock()
  367. }
  368. }
  369. // SeedLocal replaces the local repository index with the given list of files,
  370. // in protocol data types. Does not track deletes, should only be used to seed
  371. // the local index from a cache file at startup.
  372. func (m *Model) SeedLocal(fs []protocol.FileInfo) {
  373. m.lmut.Lock()
  374. m.local = make(map[string]File)
  375. for _, f := range fs {
  376. m.local[f.Name] = fileFromFileInfo(f)
  377. }
  378. m.lmut.Unlock()
  379. m.recomputeGlobal()
  380. m.recomputeNeedForGlobal()
  381. }
  382. // ConnectedTo returns true if we are connected to the named node.
  383. func (m *Model) ConnectedTo(nodeID string) bool {
  384. m.pmut.RLock()
  385. _, ok := m.protoConn[nodeID]
  386. m.pmut.RUnlock()
  387. return ok
  388. }
  389. // RepoID returns a unique ID representing the current repository location.
  390. func (m *Model) RepoID() string {
  391. return fmt.Sprintf("%x", sha1.Sum([]byte(m.dir)))
  392. }
  393. // AddConnection adds a new peer connection to the model. An initial index will
  394. // be sent to the connected peer, thereafter index updates whenever the local
  395. // repository changes.
  396. func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
  397. nodeID := protoConn.ID()
  398. m.pmut.Lock()
  399. m.protoConn[nodeID] = protoConn
  400. m.rawConn[nodeID] = rawConn
  401. m.pmut.Unlock()
  402. go func() {
  403. idx := m.ProtocolIndex()
  404. protoConn.Index(idx)
  405. }()
  406. m.initmut.Lock()
  407. rw := m.rwRunning
  408. m.initmut.Unlock()
  409. if !rw {
  410. return
  411. }
  412. for i := 0; i < m.parallelRequests; i++ {
  413. i := i
  414. go func() {
  415. if m.trace["pull"] {
  416. log.Println("DEBUG: PULL: Starting", nodeID, i)
  417. }
  418. for {
  419. m.pmut.RLock()
  420. if _, ok := m.protoConn[nodeID]; !ok {
  421. if m.trace["pull"] {
  422. log.Println("DEBUG: PULL: Exiting", nodeID, i)
  423. }
  424. m.pmut.RUnlock()
  425. return
  426. }
  427. m.pmut.RUnlock()
  428. qb, ok := m.fq.Get(nodeID)
  429. if ok {
  430. if m.trace["pull"] {
  431. log.Println("DEBUG: PULL: Request", nodeID, i, qb.name, qb.block.Offset)
  432. }
  433. data, _ := protoConn.Request(qb.name, qb.block.Offset, qb.block.Size, qb.block.Hash)
  434. m.fq.Done(qb.name, qb.block.Offset, data)
  435. } else {
  436. time.Sleep(1 * time.Second)
  437. }
  438. }
  439. }()
  440. }
  441. }
  442. // ProtocolIndex returns the current local index in protocol data types.
  443. // Must be called with the read lock held.
  444. func (m *Model) ProtocolIndex() []protocol.FileInfo {
  445. var index []protocol.FileInfo
  446. m.lmut.RLock()
  447. for _, f := range m.local {
  448. mf := fileInfoFromFile(f)
  449. if m.trace["idx"] {
  450. var flagComment string
  451. if mf.Flags&protocol.FlagDeleted != 0 {
  452. flagComment = " (deleted)"
  453. }
  454. log.Printf("DEBUG: IDX(out): %q m=%d f=%o%s v=%d (%d blocks)", mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  455. }
  456. index = append(index, mf)
  457. }
  458. m.lmut.RUnlock()
  459. return index
  460. }
  461. func (m *Model) requestGlobal(nodeID, name string, offset int64, size uint32, hash []byte) ([]byte, error) {
  462. m.pmut.RLock()
  463. nc, ok := m.protoConn[nodeID]
  464. m.pmut.RUnlock()
  465. if !ok {
  466. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  467. }
  468. if m.trace["net"] {
  469. log.Printf("DEBUG: NET REQ(out): %s: %q o=%d s=%d h=%x", nodeID, name, offset, size, hash)
  470. }
  471. return nc.Request(name, offset, size, hash)
  472. }
  473. func (m *Model) broadcastIndexLoop() {
  474. for {
  475. m.umut.RLock()
  476. bcastRequested := m.lastIdxBcastRequest.After(m.lastIdxBcast)
  477. holdtimeExceeded := time.Since(m.lastIdxBcastRequest) > idxBcastHoldtime
  478. m.umut.RUnlock()
  479. maxDelayExceeded := time.Since(m.lastIdxBcast) > idxBcastMaxDelay
  480. if bcastRequested && (holdtimeExceeded || maxDelayExceeded) {
  481. idx := m.ProtocolIndex()
  482. var indexWg sync.WaitGroup
  483. indexWg.Add(len(m.protoConn))
  484. m.umut.Lock()
  485. m.lastIdxBcast = time.Now()
  486. m.umut.Unlock()
  487. m.pmut.RLock()
  488. for _, node := range m.protoConn {
  489. node := node
  490. if m.trace["net"] {
  491. log.Printf("DEBUG: NET IDX(out/loop): %s: %d files", node.ID(), len(idx))
  492. }
  493. go func() {
  494. node.Index(idx)
  495. indexWg.Done()
  496. }()
  497. }
  498. m.pmut.RUnlock()
  499. indexWg.Wait()
  500. }
  501. time.Sleep(idxBcastHoldtime)
  502. }
  503. }
  504. // markDeletedLocals sets the deleted flag on files that have gone missing locally.
  505. func (m *Model) markDeletedLocals(newLocal map[string]File) bool {
  506. // For every file in the existing local table, check if they are also
  507. // present in the new local table. If they are not, check that we already
  508. // had the newest version available according to the global table and if so
  509. // note the file as having been deleted.
  510. var updated bool
  511. m.gmut.RLock()
  512. m.lmut.RLock()
  513. for n, f := range m.local {
  514. if _, ok := newLocal[n]; !ok {
  515. if gf := m.global[n]; !gf.NewerThan(f) {
  516. if f.Flags&protocol.FlagDeleted == 0 {
  517. f.Flags = protocol.FlagDeleted
  518. f.Version++
  519. f.Blocks = nil
  520. updated = true
  521. }
  522. newLocal[n] = f
  523. }
  524. }
  525. }
  526. m.lmut.RUnlock()
  527. m.gmut.RUnlock()
  528. return updated
  529. }
  530. func (m *Model) updateLocal(f File) {
  531. var updated bool
  532. m.lmut.Lock()
  533. if ef, ok := m.local[f.Name]; !ok || !ef.Equals(f) {
  534. m.local[f.Name] = f
  535. updated = true
  536. }
  537. m.lmut.Unlock()
  538. if updated {
  539. m.recomputeGlobal()
  540. // We don't recomputeNeed here for two reasons:
  541. // - a need shouldn't have arisen due to having a newer local file
  542. // - recomputeNeed might call into fq.Add but we might have been called by
  543. // fq which would be a deadlock on fq
  544. m.umut.Lock()
  545. m.updatedLocal = time.Now().Unix()
  546. m.lastIdxBcastRequest = time.Now()
  547. m.umut.Unlock()
  548. }
  549. }
  550. /*
  551. XXX: Not done, needs elegant handling of availability
  552. func (m *Model) recomputeGlobalFor(files []File) bool {
  553. m.gmut.Lock()
  554. defer m.gmut.Unlock()
  555. var updated bool
  556. for _, f := range files {
  557. if gf, ok := m.global[f.Name]; !ok || f.NewerThan(gf) {
  558. m.global[f.Name] = f
  559. updated = true
  560. // Fix availability
  561. }
  562. }
  563. return updated
  564. }
  565. */
  566. func (m *Model) recomputeGlobal() {
  567. var newGlobal = make(map[string]File)
  568. m.lmut.RLock()
  569. for n, f := range m.local {
  570. newGlobal[n] = f
  571. }
  572. m.lmut.RUnlock()
  573. var available = make(map[string][]string)
  574. m.rmut.RLock()
  575. var highestMod int64
  576. for nodeID, fs := range m.remote {
  577. for n, nf := range fs {
  578. if lf, ok := newGlobal[n]; !ok || nf.NewerThan(lf) {
  579. newGlobal[n] = nf
  580. available[n] = []string{nodeID}
  581. if nf.Modified > highestMod {
  582. highestMod = nf.Modified
  583. }
  584. } else if lf.Equals(nf) {
  585. available[n] = append(available[n], nodeID)
  586. }
  587. }
  588. }
  589. m.rmut.RUnlock()
  590. for f, ns := range available {
  591. m.fq.SetAvailable(f, ns)
  592. }
  593. // Figure out if anything actually changed
  594. m.gmut.RLock()
  595. var updated bool
  596. if highestMod > m.updateGlobal || len(newGlobal) != len(m.global) {
  597. updated = true
  598. } else {
  599. for n, f0 := range newGlobal {
  600. if f1, ok := m.global[n]; !ok || !f0.Equals(f1) {
  601. updated = true
  602. break
  603. }
  604. }
  605. }
  606. m.gmut.RUnlock()
  607. if updated {
  608. m.gmut.Lock()
  609. m.umut.Lock()
  610. m.global = newGlobal
  611. m.updateGlobal = time.Now().Unix()
  612. m.umut.Unlock()
  613. m.gmut.Unlock()
  614. }
  615. }
  616. type addOrder struct {
  617. n string
  618. remote []Block
  619. fm *fileMonitor
  620. }
  621. func (m *Model) recomputeNeedForGlobal() {
  622. var toDelete []File
  623. var toAdd []addOrder
  624. m.gmut.RLock()
  625. for _, gf := range m.global {
  626. toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
  627. }
  628. m.gmut.RUnlock()
  629. for _, ao := range toAdd {
  630. m.fq.Add(ao.n, ao.remote, ao.fm)
  631. }
  632. for _, gf := range toDelete {
  633. m.dq <- gf
  634. }
  635. }
  636. func (m *Model) recomputeNeedForFiles(files []File) {
  637. var toDelete []File
  638. var toAdd []addOrder
  639. m.gmut.RLock()
  640. for _, gf := range files {
  641. toAdd, toDelete = m.recomputeNeedForFile(gf, toAdd, toDelete)
  642. }
  643. m.gmut.RUnlock()
  644. for _, ao := range toAdd {
  645. m.fq.Add(ao.n, ao.remote, ao.fm)
  646. }
  647. for _, gf := range toDelete {
  648. m.dq <- gf
  649. }
  650. }
  651. func (m *Model) recomputeNeedForFile(gf File, toAdd []addOrder, toDelete []File) ([]addOrder, []File) {
  652. m.lmut.RLock()
  653. lf, ok := m.local[gf.Name]
  654. m.lmut.RUnlock()
  655. if !ok || gf.NewerThan(lf) {
  656. if gf.Flags&protocol.FlagInvalid != 0 {
  657. // Never attempt to sync invalid files
  658. return toAdd, toDelete
  659. }
  660. if gf.Flags&protocol.FlagDeleted != 0 && !m.delete {
  661. // Don't want to delete files, so forget this need
  662. return toAdd, toDelete
  663. }
  664. if gf.Flags&protocol.FlagDeleted != 0 && !ok {
  665. // Don't have the file, so don't need to delete it
  666. return toAdd, toDelete
  667. }
  668. if m.trace["need"] {
  669. log.Printf("DEBUG: NEED: lf:%v gf:%v", lf, gf)
  670. }
  671. if gf.Flags&protocol.FlagDeleted != 0 {
  672. toDelete = append(toDelete, gf)
  673. } else {
  674. local, remote := BlockDiff(lf.Blocks, gf.Blocks)
  675. fm := fileMonitor{
  676. name: gf.Name,
  677. path: path.Clean(path.Join(m.dir, gf.Name)),
  678. global: gf,
  679. model: m,
  680. localBlocks: local,
  681. }
  682. toAdd = append(toAdd, addOrder{gf.Name, remote, &fm})
  683. }
  684. }
  685. return toAdd, toDelete
  686. }
  687. func (m *Model) WhoHas(name string) []string {
  688. var remote []string
  689. m.gmut.RLock()
  690. m.rmut.RLock()
  691. gf := m.global[name]
  692. for node, files := range m.remote {
  693. if file, ok := files[name]; ok && file.Equals(gf) {
  694. remote = append(remote, node)
  695. }
  696. }
  697. m.rmut.RUnlock()
  698. m.gmut.RUnlock()
  699. return remote
  700. }
  701. func (m *Model) deleteLoop() {
  702. for file := range m.dq {
  703. if m.trace["file"] {
  704. log.Println("DEBUG: FILE: Delete", file.Name)
  705. }
  706. path := path.Clean(path.Join(m.dir, file.Name))
  707. err := os.Remove(path)
  708. if err != nil {
  709. log.Printf("WARNING: %s: %v", file.Name, err)
  710. }
  711. m.updateLocal(file)
  712. }
  713. }
  714. func fileFromFileInfo(f protocol.FileInfo) File {
  715. var blocks = make([]Block, len(f.Blocks))
  716. var offset int64
  717. for i, b := range f.Blocks {
  718. blocks[i] = Block{
  719. Offset: offset,
  720. Size: b.Size,
  721. Hash: b.Hash,
  722. }
  723. offset += int64(b.Size)
  724. }
  725. return File{
  726. Name: f.Name,
  727. Flags: f.Flags,
  728. Modified: f.Modified,
  729. Version: f.Version,
  730. Blocks: blocks,
  731. }
  732. }
  733. func fileInfoFromFile(f File) protocol.FileInfo {
  734. var blocks = make([]protocol.BlockInfo, len(f.Blocks))
  735. for i, b := range f.Blocks {
  736. blocks[i] = protocol.BlockInfo{
  737. Size: b.Size,
  738. Hash: b.Hash,
  739. }
  740. }
  741. return protocol.FileInfo{
  742. Name: f.Name,
  743. Flags: f.Flags,
  744. Modified: f.Modified,
  745. Version: f.Version,
  746. Blocks: blocks,
  747. }
  748. }