model.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. package model
  2. import (
  3. "compress/gzip"
  4. "crypto/sha1"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "os"
  10. "path/filepath"
  11. "sync"
  12. "time"
  13. "github.com/calmh/syncthing/buffers"
  14. "github.com/calmh/syncthing/cid"
  15. "github.com/calmh/syncthing/config"
  16. "github.com/calmh/syncthing/files"
  17. "github.com/calmh/syncthing/lamport"
  18. "github.com/calmh/syncthing/osutil"
  19. "github.com/calmh/syncthing/protocol"
  20. "github.com/calmh/syncthing/scanner"
  21. )
  22. type repoState int
  23. const (
  24. RepoIdle repoState = iota
  25. RepoScanning
  26. RepoSyncing
  27. RepoCleaning
  28. )
  29. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  30. // of an unsynchronized directory entry or a deleted file. We need it to be
  31. // larger than zero so that it's visible that there is some amount of bytes to
  32. // transfer to bring the systems into synchronization.
  33. const zeroEntrySize = 128
  34. type Model struct {
  35. indexDir string
  36. cfg *config.Configuration
  37. clientName string
  38. clientVersion string
  39. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  40. repoFiles map[string]*files.Set // repo -> files
  41. repoNodes map[string][]string // repo -> nodeIDs
  42. nodeRepos map[string][]string // nodeID -> repos
  43. suppressor map[string]*suppressor // repo -> suppressor
  44. rmut sync.RWMutex // protects the above
  45. repoState map[string]repoState // repo -> state
  46. smut sync.RWMutex
  47. cm *cid.Map
  48. protoConn map[string]protocol.Connection
  49. rawConn map[string]io.Closer
  50. nodeVer map[string]string
  51. pmut sync.RWMutex // protects protoConn and rawConn
  52. sup suppressor
  53. addedRepo bool
  54. started bool
  55. }
  56. var (
  57. ErrNoSuchFile = errors.New("no such file")
  58. ErrInvalid = errors.New("file is invalid")
  59. )
  60. // NewModel creates and starts a new model. The model starts in read-only mode,
  61. // where it sends index information to connected peers and responds to requests
  62. // for file data without altering the local repository in any way.
  63. func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string) *Model {
  64. m := &Model{
  65. indexDir: indexDir,
  66. cfg: cfg,
  67. clientName: clientName,
  68. clientVersion: clientVersion,
  69. repoCfgs: make(map[string]config.RepositoryConfiguration),
  70. repoFiles: make(map[string]*files.Set),
  71. repoNodes: make(map[string][]string),
  72. nodeRepos: make(map[string][]string),
  73. repoState: make(map[string]repoState),
  74. suppressor: make(map[string]*suppressor),
  75. cm: cid.NewMap(),
  76. protoConn: make(map[string]protocol.Connection),
  77. rawConn: make(map[string]io.Closer),
  78. nodeVer: make(map[string]string),
  79. sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
  80. }
  81. go m.broadcastIndexLoop()
  82. return m
  83. }
  84. // StartRW starts read/write processing on the current model. When in
  85. // read/write mode the model will attempt to keep in sync with the cluster by
  86. // pulling needed files from peer nodes.
  87. func (m *Model) StartRepoRW(repo string, threads int) {
  88. m.rmut.RLock()
  89. defer m.rmut.RUnlock()
  90. if cfg, ok := m.repoCfgs[repo]; !ok {
  91. panic("cannot start without repo")
  92. } else {
  93. newPuller(cfg, m, threads, m.cfg)
  94. }
  95. }
  96. // StartRO starts read only processing on the current model. When in
  97. // read only mode the model will announce files to the cluster but not
  98. // pull in any external changes.
  99. func (m *Model) StartRepoRO(repo string) {
  100. m.StartRepoRW(repo, 0) // zero threads => read only
  101. }
  102. type ConnectionInfo struct {
  103. protocol.Statistics
  104. Address string
  105. ClientVersion string
  106. Completion int
  107. }
  108. // ConnectionStats returns a map with connection statistics for each connected node.
  109. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  110. type remoteAddrer interface {
  111. RemoteAddr() net.Addr
  112. }
  113. m.pmut.RLock()
  114. m.rmut.RLock()
  115. var res = make(map[string]ConnectionInfo)
  116. for node, conn := range m.protoConn {
  117. ci := ConnectionInfo{
  118. Statistics: conn.Statistics(),
  119. ClientVersion: m.nodeVer[node],
  120. }
  121. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  122. ci.Address = nc.RemoteAddr().String()
  123. }
  124. var tot int64
  125. var have int64
  126. for _, repo := range m.nodeRepos[node] {
  127. for _, f := range m.repoFiles[repo].Global() {
  128. if !protocol.IsDeleted(f.Flags) {
  129. size := f.Size
  130. if protocol.IsDirectory(f.Flags) {
  131. size = zeroEntrySize
  132. }
  133. tot += size
  134. have += size
  135. }
  136. }
  137. for _, f := range m.repoFiles[repo].Need(m.cm.Get(node)) {
  138. if !protocol.IsDeleted(f.Flags) {
  139. size := f.Size
  140. if protocol.IsDirectory(f.Flags) {
  141. size = zeroEntrySize
  142. }
  143. have -= size
  144. }
  145. }
  146. }
  147. ci.Completion = 100
  148. if tot != 0 {
  149. ci.Completion = int(100 * have / tot)
  150. }
  151. res[node] = ci
  152. }
  153. m.rmut.RUnlock()
  154. m.pmut.RUnlock()
  155. in, out := protocol.TotalInOut()
  156. res["total"] = ConnectionInfo{
  157. Statistics: protocol.Statistics{
  158. At: time.Now(),
  159. InBytesTotal: int(in),
  160. OutBytesTotal: int(out),
  161. },
  162. }
  163. return res
  164. }
  165. func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) {
  166. for _, f := range fs {
  167. if !protocol.IsDeleted(f.Flags) {
  168. files++
  169. if !protocol.IsDirectory(f.Flags) {
  170. bytes += f.Size
  171. } else {
  172. bytes += zeroEntrySize
  173. }
  174. } else {
  175. deleted++
  176. bytes += zeroEntrySize
  177. }
  178. }
  179. return
  180. }
  181. // GlobalSize returns the number of files, deleted files and total bytes for all
  182. // files in the global model.
  183. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  184. m.rmut.RLock()
  185. defer m.rmut.RUnlock()
  186. if rf, ok := m.repoFiles[repo]; ok {
  187. return sizeOf(rf.Global())
  188. }
  189. return 0, 0, 0
  190. }
  191. // LocalSize returns the number of files, deleted files and total bytes for all
  192. // files in the local repository.
  193. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  194. m.rmut.RLock()
  195. defer m.rmut.RUnlock()
  196. if rf, ok := m.repoFiles[repo]; ok {
  197. return sizeOf(rf.Have(cid.LocalID))
  198. }
  199. return 0, 0, 0
  200. }
  201. // NeedSize returns the number and total size of currently needed files.
  202. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  203. f, d, b := sizeOf(m.NeedFilesRepo(repo))
  204. return f + d, b
  205. }
  206. // NeedFiles returns the list of currently needed files and the total size.
  207. func (m *Model) NeedFilesRepo(repo string) []scanner.File {
  208. m.rmut.RLock()
  209. defer m.rmut.RUnlock()
  210. if rf, ok := m.repoFiles[repo]; ok {
  211. return rf.Need(cid.LocalID)
  212. }
  213. return nil
  214. }
  215. // Index is called when a new node is connected and we receive their full index.
  216. // Implements the protocol.Model interface.
  217. func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) {
  218. if debug {
  219. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  220. }
  221. var files = make([]scanner.File, len(fs))
  222. for i := range fs {
  223. f := fs[i]
  224. lamport.Default.Tick(f.Version)
  225. if debug {
  226. var flagComment string
  227. if protocol.IsDeleted(f.Flags) {
  228. flagComment = " (deleted)"
  229. }
  230. l.Debugf("IDX(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  231. }
  232. files[i] = fileFromFileInfo(f)
  233. }
  234. id := m.cm.Get(nodeID)
  235. m.rmut.RLock()
  236. if r, ok := m.repoFiles[repo]; ok {
  237. r.Replace(id, files)
  238. } else {
  239. l.Warnf("Index from %s for nonexistant repo %q; dropping", nodeID, repo)
  240. }
  241. m.rmut.RUnlock()
  242. }
  243. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  244. // Implements the protocol.Model interface.
  245. func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) {
  246. if debug {
  247. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  248. }
  249. var files = make([]scanner.File, len(fs))
  250. for i := range fs {
  251. f := fs[i]
  252. lamport.Default.Tick(f.Version)
  253. if debug {
  254. var flagComment string
  255. if protocol.IsDeleted(f.Flags) {
  256. flagComment = " (deleted)"
  257. }
  258. l.Debugf("IDXUP(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  259. }
  260. files[i] = fileFromFileInfo(f)
  261. }
  262. id := m.cm.Get(nodeID)
  263. m.rmut.RLock()
  264. if r, ok := m.repoFiles[repo]; ok {
  265. r.Update(id, files)
  266. } else {
  267. l.Warnf("Index update from %s for nonexistant repo %q; dropping", nodeID, repo)
  268. }
  269. m.rmut.RUnlock()
  270. }
  271. func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessage) {
  272. compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
  273. if debug {
  274. l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
  275. l.Debugf(" ... compare: %s: %v", nodeID, compErr)
  276. }
  277. if compErr != nil {
  278. l.Warnf("%s: %v", nodeID, compErr)
  279. m.Close(nodeID, compErr)
  280. }
  281. m.pmut.Lock()
  282. if config.ClientName == "syncthing" {
  283. m.nodeVer[nodeID] = config.ClientVersion
  284. } else {
  285. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  286. }
  287. m.pmut.Unlock()
  288. }
  289. // Close removes the peer from the model and closes the underlying connection if possible.
  290. // Implements the protocol.Model interface.
  291. func (m *Model) Close(node string, err error) {
  292. if debug {
  293. l.Debugf("%s: %v", node, err)
  294. }
  295. if err != io.EOF {
  296. l.Warnf("Connection to %s closed: %v", node, err)
  297. } else if _, ok := err.(ClusterConfigMismatch); ok {
  298. l.Warnf("Connection to %s closed: %v", node, err)
  299. }
  300. cid := m.cm.Get(node)
  301. m.rmut.RLock()
  302. for _, repo := range m.nodeRepos[node] {
  303. m.repoFiles[repo].Replace(cid, nil)
  304. }
  305. m.rmut.RUnlock()
  306. m.cm.Clear(node)
  307. m.pmut.Lock()
  308. conn, ok := m.rawConn[node]
  309. if ok {
  310. conn.Close()
  311. }
  312. delete(m.protoConn, node)
  313. delete(m.rawConn, node)
  314. delete(m.nodeVer, node)
  315. m.pmut.Unlock()
  316. }
  317. // Request returns the specified data segment by reading it from local disk.
  318. // Implements the protocol.Model interface.
  319. func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) {
  320. // Verify that the requested file exists in the local model.
  321. m.rmut.RLock()
  322. r, ok := m.repoFiles[repo]
  323. m.rmut.RUnlock()
  324. if !ok {
  325. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  326. return nil, ErrNoSuchFile
  327. }
  328. lf := r.Get(cid.LocalID, name)
  329. if lf.Suppressed || protocol.IsDeleted(lf.Flags) {
  330. if debug {
  331. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  332. }
  333. return nil, ErrInvalid
  334. }
  335. if offset > lf.Size {
  336. if debug {
  337. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  338. }
  339. return nil, ErrNoSuchFile
  340. }
  341. if debug && nodeID != "<local>" {
  342. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  343. }
  344. m.rmut.RLock()
  345. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  346. m.rmut.RUnlock()
  347. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  348. if err != nil {
  349. return nil, err
  350. }
  351. defer fd.Close()
  352. buf := buffers.Get(int(size))
  353. _, err = fd.ReadAt(buf, offset)
  354. if err != nil {
  355. return nil, err
  356. }
  357. return buf, nil
  358. }
  359. // ReplaceLocal replaces the local repository index with the given list of files.
  360. func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
  361. m.rmut.RLock()
  362. m.repoFiles[repo].ReplaceWithDelete(cid.LocalID, fs)
  363. m.rmut.RUnlock()
  364. }
  365. func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) {
  366. var sfs = make([]scanner.File, len(fs))
  367. for i := 0; i < len(fs); i++ {
  368. lamport.Default.Tick(fs[i].Version)
  369. sfs[i] = fileFromFileInfo(fs[i])
  370. sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
  371. }
  372. m.rmut.RLock()
  373. m.repoFiles[repo].Replace(cid.LocalID, sfs)
  374. m.rmut.RUnlock()
  375. }
  376. func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
  377. m.rmut.RLock()
  378. f := m.repoFiles[repo].Get(cid.LocalID, file)
  379. m.rmut.RUnlock()
  380. return f
  381. }
  382. func (m *Model) CurrentGlobalFile(repo string, file string) scanner.File {
  383. m.rmut.RLock()
  384. f := m.repoFiles[repo].GetGlobal(file)
  385. m.rmut.RUnlock()
  386. return f
  387. }
  388. type cFiler struct {
  389. m *Model
  390. r string
  391. }
  392. // Implements scanner.CurrentFiler
  393. func (cf cFiler) CurrentFile(file string) scanner.File {
  394. return cf.m.CurrentRepoFile(cf.r, file)
  395. }
  396. // ConnectedTo returns true if we are connected to the named node.
  397. func (m *Model) ConnectedTo(nodeID string) bool {
  398. m.pmut.RLock()
  399. _, ok := m.protoConn[nodeID]
  400. m.pmut.RUnlock()
  401. return ok
  402. }
  403. // AddConnection adds a new peer connection to the model. An initial index will
  404. // be sent to the connected peer, thereafter index updates whenever the local
  405. // repository changes.
  406. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  407. nodeID := protoConn.ID()
  408. m.pmut.Lock()
  409. if _, ok := m.protoConn[nodeID]; ok {
  410. panic("add existing node")
  411. }
  412. m.protoConn[nodeID] = protoConn
  413. if _, ok := m.rawConn[nodeID]; ok {
  414. panic("add existing node")
  415. }
  416. m.rawConn[nodeID] = rawConn
  417. m.pmut.Unlock()
  418. cm := m.clusterConfig(nodeID)
  419. protoConn.ClusterConfig(cm)
  420. var idxToSend = make(map[string][]protocol.FileInfo)
  421. m.rmut.RLock()
  422. for _, repo := range m.nodeRepos[nodeID] {
  423. idxToSend[repo] = m.protocolIndex(repo)
  424. }
  425. m.rmut.RUnlock()
  426. go func() {
  427. for repo, idx := range idxToSend {
  428. if debug {
  429. l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
  430. }
  431. protoConn.Index(repo, idx)
  432. }
  433. }()
  434. }
  435. // protocolIndex returns the current local index in protocol data types.
  436. func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
  437. var index []protocol.FileInfo
  438. fs := m.repoFiles[repo].Have(cid.LocalID)
  439. for _, f := range fs {
  440. mf := fileInfoFromFile(f)
  441. if debug {
  442. var flagComment string
  443. if protocol.IsDeleted(mf.Flags) {
  444. flagComment = " (deleted)"
  445. }
  446. l.Debugf("IDX(out): %q/%q m=%d f=%o%s v=%d (%d blocks)", repo, mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  447. }
  448. index = append(index, mf)
  449. }
  450. return index
  451. }
  452. func (m *Model) updateLocal(repo string, f scanner.File) {
  453. m.rmut.RLock()
  454. m.repoFiles[repo].Update(cid.LocalID, []scanner.File{f})
  455. m.rmut.RUnlock()
  456. }
  457. func (m *Model) requestGlobal(nodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  458. m.pmut.RLock()
  459. nc, ok := m.protoConn[nodeID]
  460. m.pmut.RUnlock()
  461. if !ok {
  462. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  463. }
  464. if debug {
  465. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  466. }
  467. return nc.Request(repo, name, offset, size)
  468. }
  469. func (m *Model) broadcastIndexLoop() {
  470. var lastChange = map[string]uint64{}
  471. for {
  472. time.Sleep(5 * time.Second)
  473. m.pmut.RLock()
  474. m.rmut.RLock()
  475. var indexWg sync.WaitGroup
  476. for repo, fs := range m.repoFiles {
  477. repo := repo
  478. c := fs.Changes(cid.LocalID)
  479. if c == lastChange[repo] {
  480. continue
  481. }
  482. lastChange[repo] = c
  483. idx := m.protocolIndex(repo)
  484. indexWg.Add(1)
  485. go func() {
  486. m.saveIndex(repo, m.indexDir, idx)
  487. indexWg.Done()
  488. }()
  489. for _, nodeID := range m.repoNodes[repo] {
  490. nodeID := nodeID
  491. if conn, ok := m.protoConn[nodeID]; ok {
  492. indexWg.Add(1)
  493. if debug {
  494. l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
  495. }
  496. go func() {
  497. conn.Index(repo, idx)
  498. indexWg.Done()
  499. }()
  500. }
  501. }
  502. }
  503. m.rmut.RUnlock()
  504. m.pmut.RUnlock()
  505. indexWg.Wait()
  506. }
  507. }
  508. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  509. if m.started {
  510. panic("cannot add repo to started model")
  511. }
  512. if len(cfg.ID) == 0 {
  513. panic("cannot add empty repo id")
  514. }
  515. m.rmut.Lock()
  516. m.repoCfgs[cfg.ID] = cfg
  517. m.repoFiles[cfg.ID] = files.NewSet()
  518. m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
  519. m.repoNodes[cfg.ID] = make([]string, len(cfg.Nodes))
  520. for i, node := range cfg.Nodes {
  521. m.repoNodes[cfg.ID][i] = node.NodeID
  522. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  523. }
  524. m.addedRepo = true
  525. m.rmut.Unlock()
  526. }
  527. func (m *Model) ScanRepos() {
  528. m.rmut.RLock()
  529. var repos = make([]string, 0, len(m.repoCfgs))
  530. for repo := range m.repoCfgs {
  531. repos = append(repos, repo)
  532. }
  533. m.rmut.RUnlock()
  534. var wg sync.WaitGroup
  535. wg.Add(len(repos))
  536. for _, repo := range repos {
  537. repo := repo
  538. go func() {
  539. m.ScanRepo(repo)
  540. wg.Done()
  541. }()
  542. }
  543. wg.Wait()
  544. }
  545. func (m *Model) CleanRepos() {
  546. m.rmut.RLock()
  547. var dirs = make([]string, 0, len(m.repoCfgs))
  548. for _, cfg := range m.repoCfgs {
  549. dirs = append(dirs, cfg.Directory)
  550. }
  551. m.rmut.RUnlock()
  552. var wg sync.WaitGroup
  553. wg.Add(len(dirs))
  554. for _, dir := range dirs {
  555. w := &scanner.Walker{
  556. Dir: dir,
  557. TempNamer: defTempNamer,
  558. }
  559. go func() {
  560. w.CleanTempFiles()
  561. wg.Done()
  562. }()
  563. }
  564. wg.Wait()
  565. }
  566. func (m *Model) ScanRepo(repo string) error {
  567. m.rmut.RLock()
  568. w := &scanner.Walker{
  569. Dir: m.repoCfgs[repo].Directory,
  570. IgnoreFile: ".stignore",
  571. BlockSize: scanner.StandardBlockSize,
  572. TempNamer: defTempNamer,
  573. Suppressor: m.suppressor[repo],
  574. CurrentFiler: cFiler{m, repo},
  575. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  576. }
  577. m.rmut.RUnlock()
  578. m.setState(repo, RepoScanning)
  579. fs, _, err := w.Walk()
  580. if err != nil {
  581. return err
  582. }
  583. m.ReplaceLocal(repo, fs)
  584. m.setState(repo, RepoIdle)
  585. return nil
  586. }
  587. func (m *Model) SaveIndexes(dir string) {
  588. m.rmut.RLock()
  589. for repo := range m.repoCfgs {
  590. fs := m.protocolIndex(repo)
  591. m.saveIndex(repo, dir, fs)
  592. }
  593. m.rmut.RUnlock()
  594. }
  595. func (m *Model) LoadIndexes(dir string) {
  596. m.rmut.RLock()
  597. for repo := range m.repoCfgs {
  598. fs := m.loadIndex(repo, dir)
  599. m.SeedLocal(repo, fs)
  600. }
  601. m.rmut.RUnlock()
  602. }
  603. func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) {
  604. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  605. name := id + ".idx.gz"
  606. name = filepath.Join(dir, name)
  607. idxf, err := os.Create(name + ".tmp")
  608. if err != nil {
  609. return
  610. }
  611. gzw := gzip.NewWriter(idxf)
  612. protocol.IndexMessage{
  613. Repository: repo,
  614. Files: fs,
  615. }.EncodeXDR(gzw)
  616. gzw.Close()
  617. idxf.Close()
  618. osutil.Rename(name+".tmp", name)
  619. }
  620. func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
  621. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  622. name := id + ".idx.gz"
  623. name = filepath.Join(dir, name)
  624. idxf, err := os.Open(name)
  625. if err != nil {
  626. return nil
  627. }
  628. defer idxf.Close()
  629. gzr, err := gzip.NewReader(idxf)
  630. if err != nil {
  631. return nil
  632. }
  633. defer gzr.Close()
  634. var im protocol.IndexMessage
  635. err = im.DecodeXDR(gzr)
  636. if err != nil || im.Repository != repo {
  637. return nil
  638. }
  639. return im.Files
  640. }
  641. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  642. func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage {
  643. cm := protocol.ClusterConfigMessage{
  644. ClientName: m.clientName,
  645. ClientVersion: m.clientVersion,
  646. }
  647. m.rmut.RLock()
  648. for _, repo := range m.nodeRepos[node] {
  649. cr := protocol.Repository{
  650. ID: repo,
  651. }
  652. for _, node := range m.repoNodes[repo] {
  653. // TODO: Set read only bit when relevant
  654. cr.Nodes = append(cr.Nodes, protocol.Node{
  655. ID: node,
  656. Flags: protocol.FlagShareTrusted,
  657. })
  658. }
  659. cm.Repositories = append(cm.Repositories, cr)
  660. }
  661. m.rmut.RUnlock()
  662. return cm
  663. }
  664. func (m *Model) setState(repo string, state repoState) {
  665. m.smut.Lock()
  666. m.repoState[repo] = state
  667. m.smut.Unlock()
  668. }
  669. func (m *Model) State(repo string) string {
  670. m.smut.RLock()
  671. state := m.repoState[repo]
  672. m.smut.RUnlock()
  673. switch state {
  674. case RepoIdle:
  675. return "idle"
  676. case RepoScanning:
  677. return "scanning"
  678. case RepoCleaning:
  679. return "cleaning"
  680. case RepoSyncing:
  681. return "syncing"
  682. default:
  683. return "unknown"
  684. }
  685. }