model.go 20 KB


  1. // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
  2. // Use of this source code is governed by an MIT-style license that can be
  3. // found in the LICENSE file.
  4. package model
  5. import (
  6. "compress/gzip"
  7. "crypto/sha1"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "os"
  13. "path/filepath"
  14. "sync"
  15. "time"
  16. "github.com/calmh/syncthing/buffers"
  17. "github.com/calmh/syncthing/cid"
  18. "github.com/calmh/syncthing/config"
  19. "github.com/calmh/syncthing/files"
  20. "github.com/calmh/syncthing/lamport"
  21. "github.com/calmh/syncthing/osutil"
  22. "github.com/calmh/syncthing/protocol"
  23. "github.com/calmh/syncthing/scanner"
  24. )
  25. type repoState int
  26. const (
  27. RepoIdle repoState = iota
  28. RepoScanning
  29. RepoSyncing
  30. RepoCleaning
  31. )
  32. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  33. // of an unsynchronized directory entry or a deleted file. We need it to be
  34. // larger than zero so that it's visible that there is some amount of bytes to
  35. // transfer to bring the systems into synchronization.
  36. const zeroEntrySize = 128
  37. type Model struct {
  38. indexDir string
  39. cfg *config.Configuration
  40. clientName string
  41. clientVersion string
  42. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  43. repoFiles map[string]*files.Set // repo -> files
  44. repoNodes map[string][]string // repo -> nodeIDs
  45. nodeRepos map[string][]string // nodeID -> repos
  46. suppressor map[string]*suppressor // repo -> suppressor
  47. rmut sync.RWMutex // protects the above
  48. repoState map[string]repoState // repo -> state
  49. smut sync.RWMutex
  50. cm *cid.Map
  51. protoConn map[string]protocol.Connection
  52. rawConn map[string]io.Closer
  53. nodeVer map[string]string
  54. pmut sync.RWMutex // protects protoConn and rawConn
  55. sup suppressor
  56. addedRepo bool
  57. started bool
  58. }
  59. var (
  60. ErrNoSuchFile = errors.New("no such file")
  61. ErrInvalid = errors.New("file is invalid")
  62. )
  63. // NewModel creates and starts a new model. The model starts in read-only mode,
  64. // where it sends index information to connected peers and responds to requests
  65. // for file data without altering the local repository in any way.
  66. func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string) *Model {
  67. m := &Model{
  68. indexDir: indexDir,
  69. cfg: cfg,
  70. clientName: clientName,
  71. clientVersion: clientVersion,
  72. repoCfgs: make(map[string]config.RepositoryConfiguration),
  73. repoFiles: make(map[string]*files.Set),
  74. repoNodes: make(map[string][]string),
  75. nodeRepos: make(map[string][]string),
  76. repoState: make(map[string]repoState),
  77. suppressor: make(map[string]*suppressor),
  78. cm: cid.NewMap(),
  79. protoConn: make(map[string]protocol.Connection),
  80. rawConn: make(map[string]io.Closer),
  81. nodeVer: make(map[string]string),
  82. sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
  83. }
  84. go m.broadcastIndexLoop()
  85. return m
  86. }
  87. // StartRW starts read/write processing on the current model. When in
  88. // read/write mode the model will attempt to keep in sync with the cluster by
  89. // pulling needed files from peer nodes.
  90. func (m *Model) StartRepoRW(repo string, threads int) {
  91. m.rmut.RLock()
  92. defer m.rmut.RUnlock()
  93. if cfg, ok := m.repoCfgs[repo]; !ok {
  94. panic("cannot start without repo")
  95. } else {
  96. newPuller(cfg, m, threads, m.cfg)
  97. }
  98. }
  99. // StartRO starts read only processing on the current model. When in
  100. // read only mode the model will announce files to the cluster but not
  101. // pull in any external changes.
  102. func (m *Model) StartRepoRO(repo string) {
  103. m.StartRepoRW(repo, 0) // zero threads => read only
  104. }
  105. type ConnectionInfo struct {
  106. protocol.Statistics
  107. Address string
  108. ClientVersion string
  109. Completion int
  110. }
  111. // ConnectionStats returns a map with connection statistics for each connected node.
  112. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  113. type remoteAddrer interface {
  114. RemoteAddr() net.Addr
  115. }
  116. m.pmut.RLock()
  117. m.rmut.RLock()
  118. var res = make(map[string]ConnectionInfo)
  119. for node, conn := range m.protoConn {
  120. ci := ConnectionInfo{
  121. Statistics: conn.Statistics(),
  122. ClientVersion: m.nodeVer[node],
  123. }
  124. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  125. ci.Address = nc.RemoteAddr().String()
  126. }
  127. var tot int64
  128. var have int64
  129. for _, repo := range m.nodeRepos[node] {
  130. for _, f := range m.repoFiles[repo].Global() {
  131. if !protocol.IsDeleted(f.Flags) {
  132. size := f.Size
  133. if protocol.IsDirectory(f.Flags) {
  134. size = zeroEntrySize
  135. }
  136. tot += size
  137. have += size
  138. }
  139. }
  140. for _, f := range m.repoFiles[repo].Need(m.cm.Get(node)) {
  141. if !protocol.IsDeleted(f.Flags) {
  142. size := f.Size
  143. if protocol.IsDirectory(f.Flags) {
  144. size = zeroEntrySize
  145. }
  146. have -= size
  147. }
  148. }
  149. }
  150. ci.Completion = 100
  151. if tot != 0 {
  152. ci.Completion = int(100 * have / tot)
  153. }
  154. res[node] = ci
  155. }
  156. m.rmut.RUnlock()
  157. m.pmut.RUnlock()
  158. in, out := protocol.TotalInOut()
  159. res["total"] = ConnectionInfo{
  160. Statistics: protocol.Statistics{
  161. At: time.Now(),
  162. InBytesTotal: in,
  163. OutBytesTotal: out,
  164. },
  165. }
  166. return res
  167. }
  168. func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) {
  169. for _, f := range fs {
  170. if !protocol.IsDeleted(f.Flags) {
  171. files++
  172. if !protocol.IsDirectory(f.Flags) {
  173. bytes += f.Size
  174. } else {
  175. bytes += zeroEntrySize
  176. }
  177. } else {
  178. deleted++
  179. bytes += zeroEntrySize
  180. }
  181. }
  182. return
  183. }
  184. // GlobalSize returns the number of files, deleted files and total bytes for all
  185. // files in the global model.
  186. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  187. m.rmut.RLock()
  188. defer m.rmut.RUnlock()
  189. if rf, ok := m.repoFiles[repo]; ok {
  190. return sizeOf(rf.Global())
  191. }
  192. return 0, 0, 0
  193. }
  194. // LocalSize returns the number of files, deleted files and total bytes for all
  195. // files in the local repository.
  196. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  197. m.rmut.RLock()
  198. defer m.rmut.RUnlock()
  199. if rf, ok := m.repoFiles[repo]; ok {
  200. return sizeOf(rf.Have(cid.LocalID))
  201. }
  202. return 0, 0, 0
  203. }
  204. // NeedSize returns the number and total size of currently needed files.
  205. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  206. f, d, b := sizeOf(m.NeedFilesRepo(repo))
  207. return f + d, b
  208. }
  209. // NeedFiles returns the list of currently needed files and the total size.
  210. func (m *Model) NeedFilesRepo(repo string) []scanner.File {
  211. m.rmut.RLock()
  212. defer m.rmut.RUnlock()
  213. if rf, ok := m.repoFiles[repo]; ok {
  214. f := rf.Need(cid.LocalID)
  215. if r := m.repoCfgs[repo].FileRanker(); r != nil {
  216. files.SortBy(r).Sort(f)
  217. }
  218. return f
  219. }
  220. return nil
  221. }
  222. // Index is called when a new node is connected and we receive their full index.
  223. // Implements the protocol.Model interface.
  224. func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) {
  225. if debug {
  226. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  227. }
  228. if !m.repoSharedWith(repo, nodeID) {
  229. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  230. return
  231. }
  232. var files = make([]scanner.File, len(fs))
  233. for i := range fs {
  234. f := fs[i]
  235. lamport.Default.Tick(f.Version)
  236. if debug {
  237. var flagComment string
  238. if protocol.IsDeleted(f.Flags) {
  239. flagComment = " (deleted)"
  240. }
  241. l.Debugf("IDX(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  242. }
  243. files[i] = fileFromFileInfo(f)
  244. }
  245. id := m.cm.Get(nodeID)
  246. m.rmut.RLock()
  247. if r, ok := m.repoFiles[repo]; ok {
  248. r.Replace(id, files)
  249. } else {
  250. l.Fatalf("Index for nonexistant repo %q", repo)
  251. }
  252. m.rmut.RUnlock()
  253. }
  254. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  255. // Implements the protocol.Model interface.
  256. func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) {
  257. if debug {
  258. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  259. }
  260. if !m.repoSharedWith(repo, nodeID) {
  261. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  262. return
  263. }
  264. var files = make([]scanner.File, len(fs))
  265. for i := range fs {
  266. f := fs[i]
  267. lamport.Default.Tick(f.Version)
  268. if debug {
  269. var flagComment string
  270. if protocol.IsDeleted(f.Flags) {
  271. flagComment = " (deleted)"
  272. }
  273. l.Debugf("IDXUP(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  274. }
  275. files[i] = fileFromFileInfo(f)
  276. }
  277. id := m.cm.Get(nodeID)
  278. m.rmut.RLock()
  279. if r, ok := m.repoFiles[repo]; ok {
  280. r.Update(id, files)
  281. } else {
  282. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  283. }
  284. m.rmut.RUnlock()
  285. }
  286. func (m *Model) repoSharedWith(repo, nodeID string) bool {
  287. m.rmut.RLock()
  288. defer m.rmut.RUnlock()
  289. for _, nrepo := range m.nodeRepos[nodeID] {
  290. if nrepo == repo {
  291. return true
  292. }
  293. }
  294. return false
  295. }
  296. func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessage) {
  297. compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
  298. if debug {
  299. l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
  300. l.Debugf(" ... compare: %s: %v", nodeID, compErr)
  301. }
  302. if compErr != nil {
  303. l.Warnf("%s: %v", nodeID, compErr)
  304. m.Close(nodeID, compErr)
  305. }
  306. m.pmut.Lock()
  307. if config.ClientName == "syncthing" {
  308. m.nodeVer[nodeID] = config.ClientVersion
  309. } else {
  310. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  311. }
  312. m.pmut.Unlock()
  313. }
  314. // Close removes the peer from the model and closes the underlying connection if possible.
  315. // Implements the protocol.Model interface.
  316. func (m *Model) Close(node string, err error) {
  317. if debug {
  318. l.Debugf("%s: %v", node, err)
  319. }
  320. if err != io.EOF {
  321. l.Warnf("Connection to %s closed: %v", node, err)
  322. } else if _, ok := err.(ClusterConfigMismatch); ok {
  323. l.Warnf("Connection to %s closed: %v", node, err)
  324. }
  325. cid := m.cm.Get(node)
  326. m.rmut.RLock()
  327. for _, repo := range m.nodeRepos[node] {
  328. m.repoFiles[repo].Replace(cid, nil)
  329. }
  330. m.rmut.RUnlock()
  331. m.cm.Clear(node)
  332. m.pmut.Lock()
  333. conn, ok := m.rawConn[node]
  334. if ok {
  335. conn.Close()
  336. }
  337. delete(m.protoConn, node)
  338. delete(m.rawConn, node)
  339. delete(m.nodeVer, node)
  340. m.pmut.Unlock()
  341. }
  342. // Request returns the specified data segment by reading it from local disk.
  343. // Implements the protocol.Model interface.
  344. func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) {
  345. // Verify that the requested file exists in the local model.
  346. m.rmut.RLock()
  347. r, ok := m.repoFiles[repo]
  348. m.rmut.RUnlock()
  349. if !ok {
  350. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  351. return nil, ErrNoSuchFile
  352. }
  353. lf := r.Get(cid.LocalID, name)
  354. if lf.Suppressed || protocol.IsDeleted(lf.Flags) {
  355. if debug {
  356. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  357. }
  358. return nil, ErrInvalid
  359. }
  360. if offset > lf.Size {
  361. if debug {
  362. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  363. }
  364. return nil, ErrNoSuchFile
  365. }
  366. if debug && nodeID != "<local>" {
  367. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  368. }
  369. m.rmut.RLock()
  370. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  371. m.rmut.RUnlock()
  372. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  373. if err != nil {
  374. return nil, err
  375. }
  376. defer fd.Close()
  377. buf := buffers.Get(int(size))
  378. _, err = fd.ReadAt(buf, offset)
  379. if err != nil {
  380. return nil, err
  381. }
  382. return buf, nil
  383. }
  384. // ReplaceLocal replaces the local repository index with the given list of files.
  385. func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
  386. m.rmut.RLock()
  387. m.repoFiles[repo].ReplaceWithDelete(cid.LocalID, fs)
  388. m.rmut.RUnlock()
  389. }
  390. func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) {
  391. var sfs = make([]scanner.File, len(fs))
  392. for i := 0; i < len(fs); i++ {
  393. lamport.Default.Tick(fs[i].Version)
  394. sfs[i] = fileFromFileInfo(fs[i])
  395. sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
  396. }
  397. m.rmut.RLock()
  398. m.repoFiles[repo].Replace(cid.LocalID, sfs)
  399. m.rmut.RUnlock()
  400. }
  401. func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
  402. m.rmut.RLock()
  403. f := m.repoFiles[repo].Get(cid.LocalID, file)
  404. m.rmut.RUnlock()
  405. return f
  406. }
  407. func (m *Model) CurrentGlobalFile(repo string, file string) scanner.File {
  408. m.rmut.RLock()
  409. f := m.repoFiles[repo].GetGlobal(file)
  410. m.rmut.RUnlock()
  411. return f
  412. }
  413. type cFiler struct {
  414. m *Model
  415. r string
  416. }
  417. // Implements scanner.CurrentFiler
  418. func (cf cFiler) CurrentFile(file string) scanner.File {
  419. return cf.m.CurrentRepoFile(cf.r, file)
  420. }
  421. // ConnectedTo returns true if we are connected to the named node.
  422. func (m *Model) ConnectedTo(nodeID string) bool {
  423. m.pmut.RLock()
  424. _, ok := m.protoConn[nodeID]
  425. m.pmut.RUnlock()
  426. return ok
  427. }
  428. // AddConnection adds a new peer connection to the model. An initial index will
  429. // be sent to the connected peer, thereafter index updates whenever the local
  430. // repository changes.
  431. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  432. nodeID := protoConn.ID()
  433. m.pmut.Lock()
  434. if _, ok := m.protoConn[nodeID]; ok {
  435. panic("add existing node")
  436. }
  437. m.protoConn[nodeID] = protoConn
  438. if _, ok := m.rawConn[nodeID]; ok {
  439. panic("add existing node")
  440. }
  441. m.rawConn[nodeID] = rawConn
  442. m.pmut.Unlock()
  443. cm := m.clusterConfig(nodeID)
  444. protoConn.ClusterConfig(cm)
  445. var idxToSend = make(map[string][]protocol.FileInfo)
  446. m.rmut.RLock()
  447. for _, repo := range m.nodeRepos[nodeID] {
  448. idxToSend[repo] = m.protocolIndex(repo)
  449. }
  450. m.rmut.RUnlock()
  451. go func() {
  452. for repo, idx := range idxToSend {
  453. if debug {
  454. l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
  455. }
  456. protoConn.Index(repo, idx)
  457. }
  458. }()
  459. }
  460. // protocolIndex returns the current local index in protocol data types.
  461. func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
  462. var index []protocol.FileInfo
  463. fs := m.repoFiles[repo].Have(cid.LocalID)
  464. for _, f := range fs {
  465. mf := fileInfoFromFile(f)
  466. if debug {
  467. var flagComment string
  468. if protocol.IsDeleted(mf.Flags) {
  469. flagComment = " (deleted)"
  470. }
  471. l.Debugf("IDX(out): %q/%q m=%d f=%o%s v=%d (%d blocks)", repo, mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  472. }
  473. index = append(index, mf)
  474. }
  475. return index
  476. }
  477. func (m *Model) updateLocal(repo string, f scanner.File) {
  478. m.rmut.RLock()
  479. m.repoFiles[repo].Update(cid.LocalID, []scanner.File{f})
  480. m.rmut.RUnlock()
  481. }
  482. func (m *Model) requestGlobal(nodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  483. m.pmut.RLock()
  484. nc, ok := m.protoConn[nodeID]
  485. m.pmut.RUnlock()
  486. if !ok {
  487. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  488. }
  489. if debug {
  490. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  491. }
  492. return nc.Request(repo, name, offset, size)
  493. }
  494. func (m *Model) broadcastIndexLoop() {
  495. var lastChange = map[string]uint64{}
  496. for {
  497. time.Sleep(5 * time.Second)
  498. m.pmut.RLock()
  499. m.rmut.RLock()
  500. var indexWg sync.WaitGroup
  501. for repo, fs := range m.repoFiles {
  502. repo := repo
  503. c := fs.Changes(cid.LocalID)
  504. if c == lastChange[repo] {
  505. continue
  506. }
  507. lastChange[repo] = c
  508. idx := m.protocolIndex(repo)
  509. indexWg.Add(1)
  510. go func() {
  511. err := m.saveIndex(repo, m.indexDir, idx)
  512. if err != nil {
  513. l.Infof("Saving index for %q: %v", repo, err)
  514. }
  515. indexWg.Done()
  516. }()
  517. for _, nodeID := range m.repoNodes[repo] {
  518. nodeID := nodeID
  519. if conn, ok := m.protoConn[nodeID]; ok {
  520. indexWg.Add(1)
  521. if debug {
  522. l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
  523. }
  524. go func() {
  525. conn.Index(repo, idx)
  526. indexWg.Done()
  527. }()
  528. }
  529. }
  530. }
  531. m.rmut.RUnlock()
  532. m.pmut.RUnlock()
  533. indexWg.Wait()
  534. }
  535. }
  536. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  537. if m.started {
  538. panic("cannot add repo to started model")
  539. }
  540. if len(cfg.ID) == 0 {
  541. panic("cannot add empty repo id")
  542. }
  543. m.rmut.Lock()
  544. m.repoCfgs[cfg.ID] = cfg
  545. m.repoFiles[cfg.ID] = files.NewSet()
  546. m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
  547. m.repoNodes[cfg.ID] = make([]string, len(cfg.Nodes))
  548. for i, node := range cfg.Nodes {
  549. m.repoNodes[cfg.ID][i] = node.NodeID
  550. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  551. }
  552. m.addedRepo = true
  553. m.rmut.Unlock()
  554. }
  555. func (m *Model) ScanRepos() {
  556. m.rmut.RLock()
  557. var repos = make([]string, 0, len(m.repoCfgs))
  558. for repo := range m.repoCfgs {
  559. repos = append(repos, repo)
  560. }
  561. m.rmut.RUnlock()
  562. var wg sync.WaitGroup
  563. wg.Add(len(repos))
  564. for _, repo := range repos {
  565. repo := repo
  566. go func() {
  567. err := m.ScanRepo(repo)
  568. if err != nil {
  569. invalidateRepo(m.cfg, repo, err)
  570. }
  571. wg.Done()
  572. }()
  573. }
  574. wg.Wait()
  575. }
  576. func (m *Model) CleanRepos() {
  577. m.rmut.RLock()
  578. var dirs = make([]string, 0, len(m.repoCfgs))
  579. for _, cfg := range m.repoCfgs {
  580. dirs = append(dirs, cfg.Directory)
  581. }
  582. m.rmut.RUnlock()
  583. var wg sync.WaitGroup
  584. wg.Add(len(dirs))
  585. for _, dir := range dirs {
  586. w := &scanner.Walker{
  587. Dir: dir,
  588. TempNamer: defTempNamer,
  589. }
  590. go func() {
  591. w.CleanTempFiles()
  592. wg.Done()
  593. }()
  594. }
  595. wg.Wait()
  596. }
  597. func (m *Model) ScanRepo(repo string) error {
  598. m.rmut.RLock()
  599. w := &scanner.Walker{
  600. Dir: m.repoCfgs[repo].Directory,
  601. IgnoreFile: ".stignore",
  602. BlockSize: scanner.StandardBlockSize,
  603. TempNamer: defTempNamer,
  604. Suppressor: m.suppressor[repo],
  605. CurrentFiler: cFiler{m, repo},
  606. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  607. }
  608. m.rmut.RUnlock()
  609. m.setState(repo, RepoScanning)
  610. fs, _, err := w.Walk()
  611. if err != nil {
  612. return err
  613. }
  614. m.ReplaceLocal(repo, fs)
  615. m.setState(repo, RepoIdle)
  616. return nil
  617. }
  618. func (m *Model) SaveIndexes(dir string) {
  619. m.rmut.RLock()
  620. for repo := range m.repoCfgs {
  621. fs := m.protocolIndex(repo)
  622. err := m.saveIndex(repo, dir, fs)
  623. if err != nil {
  624. l.Infof("Saving index for %q: %v", repo, err)
  625. }
  626. }
  627. m.rmut.RUnlock()
  628. }
  629. func (m *Model) LoadIndexes(dir string) {
  630. m.rmut.RLock()
  631. for repo := range m.repoCfgs {
  632. fs := m.loadIndex(repo, dir)
  633. m.SeedLocal(repo, fs)
  634. }
  635. m.rmut.RUnlock()
  636. }
  637. func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) error {
  638. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  639. name := id + ".idx.gz"
  640. name = filepath.Join(dir, name)
  641. tmp := fmt.Sprintf("%s.tmp.%d", name, time.Now().UnixNano())
  642. idxf, err := os.OpenFile(tmp, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0644)
  643. if err != nil {
  644. return err
  645. }
  646. defer os.Remove(tmp)
  647. gzw := gzip.NewWriter(idxf)
  648. n, err := protocol.IndexMessage{
  649. Repository: repo,
  650. Files: fs,
  651. }.EncodeXDR(gzw)
  652. if err != nil {
  653. gzw.Close()
  654. idxf.Close()
  655. return err
  656. }
  657. err = gzw.Close()
  658. if err != nil {
  659. return err
  660. }
  661. err = idxf.Close()
  662. if err != nil {
  663. return err
  664. }
  665. if debug {
  666. l.Debugln("wrote index,", n, "bytes uncompressed")
  667. }
  668. return osutil.Rename(tmp, name)
  669. }
  670. func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
  671. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  672. name := id + ".idx.gz"
  673. name = filepath.Join(dir, name)
  674. idxf, err := os.Open(name)
  675. if err != nil {
  676. return nil
  677. }
  678. defer idxf.Close()
  679. gzr, err := gzip.NewReader(idxf)
  680. if err != nil {
  681. return nil
  682. }
  683. defer gzr.Close()
  684. var im protocol.IndexMessage
  685. err = im.DecodeXDR(gzr)
  686. if err != nil || im.Repository != repo {
  687. return nil
  688. }
  689. return im.Files
  690. }
  691. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  692. func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage {
  693. cm := protocol.ClusterConfigMessage{
  694. ClientName: m.clientName,
  695. ClientVersion: m.clientVersion,
  696. }
  697. m.rmut.RLock()
  698. for _, repo := range m.nodeRepos[node] {
  699. cr := protocol.Repository{
  700. ID: repo,
  701. }
  702. for _, node := range m.repoNodes[repo] {
  703. // TODO: Set read only bit when relevant
  704. cr.Nodes = append(cr.Nodes, protocol.Node{
  705. ID: node,
  706. Flags: protocol.FlagShareTrusted,
  707. })
  708. }
  709. cm.Repositories = append(cm.Repositories, cr)
  710. }
  711. m.rmut.RUnlock()
  712. return cm
  713. }
  714. func (m *Model) setState(repo string, state repoState) {
  715. m.smut.Lock()
  716. m.repoState[repo] = state
  717. m.smut.Unlock()
  718. }
  719. func (m *Model) State(repo string) string {
  720. m.smut.RLock()
  721. state := m.repoState[repo]
  722. m.smut.RUnlock()
  723. switch state {
  724. case RepoIdle:
  725. return "idle"
  726. case RepoScanning:
  727. return "scanning"
  728. case RepoCleaning:
  729. return "cleaning"
  730. case RepoSyncing:
  731. return "syncing"
  732. default:
  733. return "unknown"
  734. }
  735. }