model.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786
  1. package model
  2. import (
  3. "compress/gzip"
  4. "crypto/sha1"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "os"
  10. "path/filepath"
  11. "sync"
  12. "time"
  13. "github.com/calmh/syncthing/buffers"
  14. "github.com/calmh/syncthing/cid"
  15. "github.com/calmh/syncthing/config"
  16. "github.com/calmh/syncthing/files"
  17. "github.com/calmh/syncthing/lamport"
  18. "github.com/calmh/syncthing/protocol"
  19. "github.com/calmh/syncthing/scanner"
  20. )
  21. type repoState int
  22. const (
  23. RepoIdle repoState = iota
  24. RepoScanning
  25. RepoSyncing
  26. RepoCleaning
  27. )
  28. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  29. // of an unsynchronized directory entry or a deleted file. We need it to be
  30. // larger than zero so that it's visible that there is some amount of bytes to
  31. // transfer to bring the systems into synchronization.
  32. const zeroEntrySize = 128
  33. type Model struct {
  34. indexDir string
  35. cfg *config.Configuration
  36. clientName string
  37. clientVersion string
  38. repoDirs map[string]string // repo -> dir
  39. repoFiles map[string]*files.Set // repo -> files
  40. repoNodes map[string][]string // repo -> nodeIDs
  41. nodeRepos map[string][]string // nodeID -> repos
  42. suppressor map[string]*suppressor // repo -> suppressor
  43. rmut sync.RWMutex // protects the above
  44. repoState map[string]repoState // repo -> state
  45. smut sync.RWMutex
  46. cm *cid.Map
  47. protoConn map[string]protocol.Connection
  48. rawConn map[string]io.Closer
  49. nodeVer map[string]string
  50. pmut sync.RWMutex // protects protoConn and rawConn
  51. sup suppressor
  52. addedRepo bool
  53. started bool
  54. }
  55. var (
  56. ErrNoSuchFile = errors.New("no such file")
  57. ErrInvalid = errors.New("file is invalid")
  58. )
  59. // NewModel creates and starts a new model. The model starts in read-only mode,
  60. // where it sends index information to connected peers and responds to requests
  61. // for file data without altering the local repository in any way.
  62. func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string) *Model {
  63. m := &Model{
  64. indexDir: indexDir,
  65. cfg: cfg,
  66. clientName: clientName,
  67. clientVersion: clientVersion,
  68. repoDirs: make(map[string]string),
  69. repoFiles: make(map[string]*files.Set),
  70. repoNodes: make(map[string][]string),
  71. nodeRepos: make(map[string][]string),
  72. repoState: make(map[string]repoState),
  73. suppressor: make(map[string]*suppressor),
  74. cm: cid.NewMap(),
  75. protoConn: make(map[string]protocol.Connection),
  76. rawConn: make(map[string]io.Closer),
  77. nodeVer: make(map[string]string),
  78. sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
  79. }
  80. go m.broadcastIndexLoop()
  81. return m
  82. }
  83. // StartRW starts read/write processing on the current model. When in
  84. // read/write mode the model will attempt to keep in sync with the cluster by
  85. // pulling needed files from peer nodes.
  86. func (m *Model) StartRepoRW(repo string, threads int) {
  87. m.rmut.RLock()
  88. defer m.rmut.RUnlock()
  89. if dir, ok := m.repoDirs[repo]; !ok {
  90. panic("cannot start without repo")
  91. } else {
  92. newPuller(repo, dir, m, threads, m.cfg)
  93. }
  94. }
  95. // StartRO starts read only processing on the current model. When in
  96. // read only mode the model will announce files to the cluster but not
  97. // pull in any external changes.
  98. func (m *Model) StartRepoRO(repo string) {
  99. m.StartRepoRW(repo, 0) // zero threads => read only
  100. }
  101. type ConnectionInfo struct {
  102. protocol.Statistics
  103. Address string
  104. ClientVersion string
  105. Completion int
  106. }
  107. // ConnectionStats returns a map with connection statistics for each connected node.
  108. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  109. type remoteAddrer interface {
  110. RemoteAddr() net.Addr
  111. }
  112. m.pmut.RLock()
  113. m.rmut.RLock()
  114. var res = make(map[string]ConnectionInfo)
  115. for node, conn := range m.protoConn {
  116. ci := ConnectionInfo{
  117. Statistics: conn.Statistics(),
  118. ClientVersion: m.nodeVer[node],
  119. }
  120. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  121. ci.Address = nc.RemoteAddr().String()
  122. }
  123. var tot int64
  124. var have int64
  125. for _, repo := range m.nodeRepos[node] {
  126. for _, f := range m.repoFiles[repo].Global() {
  127. if f.Flags&protocol.FlagDeleted == 0 {
  128. size := f.Size
  129. if f.Flags&protocol.FlagDirectory != 0 {
  130. size = zeroEntrySize
  131. }
  132. tot += size
  133. have += size
  134. }
  135. }
  136. for _, f := range m.repoFiles[repo].Need(m.cm.Get(node)) {
  137. if f.Flags&protocol.FlagDeleted == 0 {
  138. size := f.Size
  139. if f.Flags&protocol.FlagDirectory != 0 {
  140. size = zeroEntrySize
  141. }
  142. have -= size
  143. }
  144. }
  145. }
  146. ci.Completion = 100
  147. if tot != 0 {
  148. ci.Completion = int(100 * have / tot)
  149. }
  150. res[node] = ci
  151. }
  152. m.rmut.RUnlock()
  153. m.pmut.RUnlock()
  154. return res
  155. }
  156. func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) {
  157. for _, f := range fs {
  158. if f.Flags&protocol.FlagDeleted == 0 {
  159. files++
  160. if f.Flags&protocol.FlagDirectory == 0 {
  161. bytes += f.Size
  162. } else {
  163. bytes += zeroEntrySize
  164. }
  165. } else {
  166. deleted++
  167. bytes += zeroEntrySize
  168. }
  169. }
  170. return
  171. }
  172. // GlobalSize returns the number of files, deleted files and total bytes for all
  173. // files in the global model.
  174. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  175. m.rmut.RLock()
  176. defer m.rmut.RUnlock()
  177. if rf, ok := m.repoFiles[repo]; ok {
  178. return sizeOf(rf.Global())
  179. }
  180. return 0, 0, 0
  181. }
  182. // LocalSize returns the number of files, deleted files and total bytes for all
  183. // files in the local repository.
  184. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  185. m.rmut.RLock()
  186. defer m.rmut.RUnlock()
  187. if rf, ok := m.repoFiles[repo]; ok {
  188. return sizeOf(rf.Have(cid.LocalID))
  189. }
  190. return 0, 0, 0
  191. }
  192. // NeedSize returns the number and total size of currently needed files.
  193. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  194. f, d, b := sizeOf(m.NeedFilesRepo(repo))
  195. return f + d, b
  196. }
  197. // NeedFiles returns the list of currently needed files and the total size.
  198. func (m *Model) NeedFilesRepo(repo string) []scanner.File {
  199. m.rmut.RLock()
  200. defer m.rmut.RUnlock()
  201. if rf, ok := m.repoFiles[repo]; ok {
  202. return rf.Need(cid.LocalID)
  203. }
  204. return nil
  205. }
  206. // Index is called when a new node is connected and we receive their full index.
  207. // Implements the protocol.Model interface.
  208. func (m *Model) Index(nodeID string, repo string, fs []protocol.FileInfo) {
  209. if debug {
  210. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  211. }
  212. var files = make([]scanner.File, len(fs))
  213. for i := range fs {
  214. f := fs[i]
  215. lamport.Default.Tick(f.Version)
  216. if debug {
  217. var flagComment string
  218. if f.Flags&protocol.FlagDeleted != 0 {
  219. flagComment = " (deleted)"
  220. }
  221. l.Debugf("IDX(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  222. }
  223. files[i] = fileFromFileInfo(f)
  224. }
  225. id := m.cm.Get(nodeID)
  226. m.rmut.RLock()
  227. if r, ok := m.repoFiles[repo]; ok {
  228. r.Replace(id, files)
  229. } else {
  230. l.Warnf("Index from %s for nonexistant repo %q; dropping", nodeID, repo)
  231. }
  232. m.rmut.RUnlock()
  233. }
  234. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  235. // Implements the protocol.Model interface.
  236. func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) {
  237. if debug {
  238. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  239. }
  240. var files = make([]scanner.File, len(fs))
  241. for i := range fs {
  242. f := fs[i]
  243. lamport.Default.Tick(f.Version)
  244. if debug {
  245. var flagComment string
  246. if f.Flags&protocol.FlagDeleted != 0 {
  247. flagComment = " (deleted)"
  248. }
  249. l.Debugf("IDXUP(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  250. }
  251. files[i] = fileFromFileInfo(f)
  252. }
  253. id := m.cm.Get(nodeID)
  254. m.rmut.RLock()
  255. if r, ok := m.repoFiles[repo]; ok {
  256. r.Update(id, files)
  257. } else {
  258. l.Warnf("Index update from %s for nonexistant repo %q; dropping", nodeID, repo)
  259. }
  260. m.rmut.RUnlock()
  261. }
  262. func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessage) {
  263. compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
  264. if debug {
  265. l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
  266. l.Debugf(" ... compare: %s: %v", nodeID, compErr)
  267. }
  268. if compErr != nil {
  269. l.Warnf("%s: %v", nodeID, compErr)
  270. m.Close(nodeID, compErr)
  271. }
  272. m.pmut.Lock()
  273. if config.ClientName == "syncthing" {
  274. m.nodeVer[nodeID] = config.ClientVersion
  275. } else {
  276. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  277. }
  278. m.pmut.Unlock()
  279. }
  280. // Close removes the peer from the model and closes the underlying connection if possible.
  281. // Implements the protocol.Model interface.
  282. func (m *Model) Close(node string, err error) {
  283. if debug {
  284. l.Debugf("%s: %v", node, err)
  285. }
  286. if err != io.EOF {
  287. l.Warnf("Connection to %s closed: %v", node, err)
  288. } else if _, ok := err.(ClusterConfigMismatch); ok {
  289. l.Warnf("Connection to %s closed: %v", node, err)
  290. }
  291. cid := m.cm.Get(node)
  292. m.rmut.RLock()
  293. for _, repo := range m.nodeRepos[node] {
  294. m.repoFiles[repo].Replace(cid, nil)
  295. }
  296. m.rmut.RUnlock()
  297. m.cm.Clear(node)
  298. m.pmut.Lock()
  299. conn, ok := m.rawConn[node]
  300. if ok {
  301. conn.Close()
  302. }
  303. delete(m.protoConn, node)
  304. delete(m.rawConn, node)
  305. delete(m.nodeVer, node)
  306. m.pmut.Unlock()
  307. }
  308. // Request returns the specified data segment by reading it from local disk.
  309. // Implements the protocol.Model interface.
  310. func (m *Model) Request(nodeID, repo, name string, offset int64, size int) ([]byte, error) {
  311. // Verify that the requested file exists in the local model.
  312. m.rmut.RLock()
  313. r, ok := m.repoFiles[repo]
  314. m.rmut.RUnlock()
  315. if !ok {
  316. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  317. return nil, ErrNoSuchFile
  318. }
  319. lf := r.Get(cid.LocalID, name)
  320. if lf.Suppressed || lf.Flags&protocol.FlagDeleted != 0 {
  321. if debug {
  322. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  323. }
  324. return nil, ErrInvalid
  325. }
  326. if offset > lf.Size {
  327. if debug {
  328. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  329. }
  330. return nil, ErrNoSuchFile
  331. }
  332. if debug && nodeID != "<local>" {
  333. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  334. }
  335. m.rmut.RLock()
  336. fn := filepath.Join(m.repoDirs[repo], name)
  337. m.rmut.RUnlock()
  338. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  339. if err != nil {
  340. return nil, err
  341. }
  342. defer fd.Close()
  343. buf := buffers.Get(int(size))
  344. _, err = fd.ReadAt(buf, offset)
  345. if err != nil {
  346. return nil, err
  347. }
  348. return buf, nil
  349. }
  350. // ReplaceLocal replaces the local repository index with the given list of files.
  351. func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
  352. m.rmut.RLock()
  353. m.repoFiles[repo].ReplaceWithDelete(cid.LocalID, fs)
  354. m.rmut.RUnlock()
  355. }
  356. func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) {
  357. var sfs = make([]scanner.File, len(fs))
  358. for i := 0; i < len(fs); i++ {
  359. lamport.Default.Tick(fs[i].Version)
  360. sfs[i] = fileFromFileInfo(fs[i])
  361. sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
  362. }
  363. m.rmut.RLock()
  364. m.repoFiles[repo].Replace(cid.LocalID, sfs)
  365. m.rmut.RUnlock()
  366. }
  367. func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
  368. m.rmut.RLock()
  369. f := m.repoFiles[repo].Get(cid.LocalID, file)
  370. m.rmut.RUnlock()
  371. return f
  372. }
  373. func (m *Model) CurrentGlobalFile(repo string, file string) scanner.File {
  374. m.rmut.RLock()
  375. f := m.repoFiles[repo].GetGlobal(file)
  376. m.rmut.RUnlock()
  377. return f
  378. }
  379. type cFiler struct {
  380. m *Model
  381. r string
  382. }
  383. // Implements scanner.CurrentFiler
  384. func (cf cFiler) CurrentFile(file string) scanner.File {
  385. return cf.m.CurrentRepoFile(cf.r, file)
  386. }
  387. // ConnectedTo returns true if we are connected to the named node.
  388. func (m *Model) ConnectedTo(nodeID string) bool {
  389. m.pmut.RLock()
  390. _, ok := m.protoConn[nodeID]
  391. m.pmut.RUnlock()
  392. return ok
  393. }
  394. // AddConnection adds a new peer connection to the model. An initial index will
  395. // be sent to the connected peer, thereafter index updates whenever the local
  396. // repository changes.
  397. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  398. nodeID := protoConn.ID()
  399. m.pmut.Lock()
  400. if _, ok := m.protoConn[nodeID]; ok {
  401. panic("add existing node")
  402. }
  403. m.protoConn[nodeID] = protoConn
  404. if _, ok := m.rawConn[nodeID]; ok {
  405. panic("add existing node")
  406. }
  407. m.rawConn[nodeID] = rawConn
  408. m.pmut.Unlock()
  409. cm := m.clusterConfig(nodeID)
  410. protoConn.ClusterConfig(cm)
  411. var idxToSend = make(map[string][]protocol.FileInfo)
  412. m.rmut.RLock()
  413. for _, repo := range m.nodeRepos[nodeID] {
  414. idxToSend[repo] = m.protocolIndex(repo)
  415. }
  416. m.rmut.RUnlock()
  417. go func() {
  418. for repo, idx := range idxToSend {
  419. if debug {
  420. l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
  421. }
  422. protoConn.Index(repo, idx)
  423. }
  424. }()
  425. }
  426. // protocolIndex returns the current local index in protocol data types.
  427. func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
  428. var index []protocol.FileInfo
  429. fs := m.repoFiles[repo].Have(cid.LocalID)
  430. for _, f := range fs {
  431. mf := fileInfoFromFile(f)
  432. if debug {
  433. var flagComment string
  434. if mf.Flags&protocol.FlagDeleted != 0 {
  435. flagComment = " (deleted)"
  436. }
  437. l.Debugf("IDX(out): %q/%q m=%d f=%o%s v=%d (%d blocks)", repo, mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  438. }
  439. index = append(index, mf)
  440. }
  441. return index
  442. }
  443. func (m *Model) updateLocal(repo string, f scanner.File) {
  444. m.rmut.RLock()
  445. m.repoFiles[repo].Update(cid.LocalID, []scanner.File{f})
  446. m.rmut.RUnlock()
  447. }
  448. func (m *Model) requestGlobal(nodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  449. m.pmut.RLock()
  450. nc, ok := m.protoConn[nodeID]
  451. m.pmut.RUnlock()
  452. if !ok {
  453. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  454. }
  455. if debug {
  456. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  457. }
  458. return nc.Request(repo, name, offset, size)
  459. }
  460. func (m *Model) broadcastIndexLoop() {
  461. var lastChange = map[string]uint64{}
  462. for {
  463. time.Sleep(5 * time.Second)
  464. m.pmut.RLock()
  465. m.rmut.RLock()
  466. var indexWg sync.WaitGroup
  467. for repo, fs := range m.repoFiles {
  468. repo := repo
  469. c := fs.Changes(cid.LocalID)
  470. if c == lastChange[repo] {
  471. continue
  472. }
  473. lastChange[repo] = c
  474. idx := m.protocolIndex(repo)
  475. indexWg.Add(1)
  476. go func() {
  477. m.saveIndex(repo, m.indexDir, idx)
  478. indexWg.Done()
  479. }()
  480. for _, nodeID := range m.repoNodes[repo] {
  481. nodeID := nodeID
  482. if conn, ok := m.protoConn[nodeID]; ok {
  483. indexWg.Add(1)
  484. if debug {
  485. l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
  486. }
  487. go func() {
  488. conn.Index(repo, idx)
  489. indexWg.Done()
  490. }()
  491. }
  492. }
  493. }
  494. m.rmut.RUnlock()
  495. m.pmut.RUnlock()
  496. indexWg.Wait()
  497. }
  498. }
  499. func (m *Model) AddRepo(id, dir string, nodes []config.NodeConfiguration) {
  500. if m.started {
  501. panic("cannot add repo to started model")
  502. }
  503. if len(id) == 0 {
  504. panic("cannot add empty repo id")
  505. }
  506. m.rmut.Lock()
  507. m.repoDirs[id] = dir
  508. m.repoFiles[id] = files.NewSet()
  509. m.suppressor[id] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
  510. m.repoNodes[id] = make([]string, len(nodes))
  511. for i, node := range nodes {
  512. m.repoNodes[id][i] = node.NodeID
  513. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], id)
  514. }
  515. m.addedRepo = true
  516. m.rmut.Unlock()
  517. }
  518. func (m *Model) ScanRepos() {
  519. m.rmut.RLock()
  520. var repos = make([]string, 0, len(m.repoDirs))
  521. for repo := range m.repoDirs {
  522. repos = append(repos, repo)
  523. }
  524. m.rmut.RUnlock()
  525. var wg sync.WaitGroup
  526. wg.Add(len(repos))
  527. for _, repo := range repos {
  528. repo := repo
  529. go func() {
  530. m.ScanRepo(repo)
  531. wg.Done()
  532. }()
  533. }
  534. wg.Wait()
  535. }
  536. func (m *Model) CleanRepos() {
  537. m.rmut.RLock()
  538. var dirs = make([]string, 0, len(m.repoDirs))
  539. for _, dir := range m.repoDirs {
  540. dirs = append(dirs, dir)
  541. }
  542. m.rmut.RUnlock()
  543. var wg sync.WaitGroup
  544. wg.Add(len(dirs))
  545. for _, dir := range dirs {
  546. w := &scanner.Walker{
  547. Dir: dir,
  548. TempNamer: defTempNamer,
  549. }
  550. go func() {
  551. w.CleanTempFiles()
  552. wg.Done()
  553. }()
  554. }
  555. wg.Wait()
  556. }
  557. func (m *Model) ScanRepo(repo string) error {
  558. m.rmut.RLock()
  559. w := &scanner.Walker{
  560. Dir: m.repoDirs[repo],
  561. IgnoreFile: ".stignore",
  562. BlockSize: scanner.StandardBlockSize,
  563. TempNamer: defTempNamer,
  564. Suppressor: m.suppressor[repo],
  565. CurrentFiler: cFiler{m, repo},
  566. }
  567. m.rmut.RUnlock()
  568. m.setState(repo, RepoScanning)
  569. fs, _, err := w.Walk()
  570. if err != nil {
  571. return err
  572. }
  573. m.ReplaceLocal(repo, fs)
  574. m.setState(repo, RepoIdle)
  575. return nil
  576. }
  577. func (m *Model) SaveIndexes(dir string) {
  578. m.rmut.RLock()
  579. for repo := range m.repoDirs {
  580. fs := m.protocolIndex(repo)
  581. m.saveIndex(repo, dir, fs)
  582. }
  583. m.rmut.RUnlock()
  584. }
  585. func (m *Model) LoadIndexes(dir string) {
  586. m.rmut.RLock()
  587. for repo := range m.repoDirs {
  588. fs := m.loadIndex(repo, dir)
  589. m.SeedLocal(repo, fs)
  590. }
  591. m.rmut.RUnlock()
  592. }
  593. func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) {
  594. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoDirs[repo])))
  595. name := id + ".idx.gz"
  596. name = filepath.Join(dir, name)
  597. idxf, err := os.Create(name + ".tmp")
  598. if err != nil {
  599. return
  600. }
  601. gzw := gzip.NewWriter(idxf)
  602. protocol.IndexMessage{
  603. Repository: repo,
  604. Files: fs,
  605. }.EncodeXDR(gzw)
  606. gzw.Close()
  607. idxf.Close()
  608. Rename(name+".tmp", name)
  609. }
  610. func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
  611. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoDirs[repo])))
  612. name := id + ".idx.gz"
  613. name = filepath.Join(dir, name)
  614. idxf, err := os.Open(name)
  615. if err != nil {
  616. return nil
  617. }
  618. defer idxf.Close()
  619. gzr, err := gzip.NewReader(idxf)
  620. if err != nil {
  621. return nil
  622. }
  623. defer gzr.Close()
  624. var im protocol.IndexMessage
  625. err = im.DecodeXDR(gzr)
  626. if err != nil || im.Repository != repo {
  627. return nil
  628. }
  629. return im.Files
  630. }
  631. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  632. func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage {
  633. cm := protocol.ClusterConfigMessage{
  634. ClientName: m.clientName,
  635. ClientVersion: m.clientVersion,
  636. }
  637. m.rmut.RLock()
  638. for _, repo := range m.nodeRepos[node] {
  639. cr := protocol.Repository{
  640. ID: repo,
  641. }
  642. for _, node := range m.repoNodes[repo] {
  643. // TODO: Set read only bit when relevant
  644. cr.Nodes = append(cr.Nodes, protocol.Node{
  645. ID: node,
  646. Flags: protocol.FlagShareTrusted,
  647. })
  648. }
  649. cm.Repositories = append(cm.Repositories, cr)
  650. }
  651. m.rmut.RUnlock()
  652. return cm
  653. }
  654. func (m *Model) setState(repo string, state repoState) {
  655. m.smut.Lock()
  656. m.repoState[repo] = state
  657. m.smut.Unlock()
  658. }
  659. func (m *Model) State(repo string) string {
  660. m.smut.RLock()
  661. state := m.repoState[repo]
  662. m.smut.RUnlock()
  663. switch state {
  664. case RepoIdle:
  665. return "idle"
  666. case RepoScanning:
  667. return "scanning"
  668. case RepoCleaning:
  669. return "cleaning"
  670. case RepoSyncing:
  671. return "syncing"
  672. default:
  673. return "unknown"
  674. }
  675. }