model.go 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "compress/gzip"
  7. "crypto/sha1"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "os"
  13. "path/filepath"
  14. "strconv"
  15. "sync"
  16. "time"
  17. "github.com/calmh/syncthing/config"
  18. "github.com/calmh/syncthing/events"
  19. "github.com/calmh/syncthing/files"
  20. "github.com/calmh/syncthing/lamport"
  21. "github.com/calmh/syncthing/osutil"
  22. "github.com/calmh/syncthing/protocol"
  23. "github.com/calmh/syncthing/scanner"
  24. "github.com/syndtr/goleveldb/leveldb"
  25. )
  26. type repoState int
  27. const (
  28. RepoIdle repoState = iota
  29. RepoScanning
  30. RepoSyncing
  31. RepoCleaning
  32. )
  33. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  34. // of an unsynchronized directory entry or a deleted file. We need it to be
  35. // larger than zero so that it's visible that there is some amount of bytes to
  36. // transfer to bring the systems into synchronization.
  37. const zeroEntrySize = 128
  38. type Model struct {
  39. indexDir string
  40. cfg *config.Configuration
  41. db *leveldb.DB
  42. clientName string
  43. clientVersion string
  44. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  45. repoFiles map[string]*files.Set // repo -> files
  46. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  47. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  48. suppressor map[string]*suppressor // repo -> suppressor
  49. rmut sync.RWMutex // protects the above
  50. repoState map[string]repoState // repo -> state
  51. smut sync.RWMutex
  52. protoConn map[protocol.NodeID]protocol.Connection
  53. rawConn map[protocol.NodeID]io.Closer
  54. nodeVer map[protocol.NodeID]string
  55. pmut sync.RWMutex // protects protoConn and rawConn
  56. sup suppressor
  57. addedRepo bool
  58. started bool
  59. }
  60. var (
  61. ErrNoSuchFile = errors.New("no such file")
  62. ErrInvalid = errors.New("file is invalid")
  63. )
  64. // NewModel creates and starts a new model. The model starts in read-only mode,
  65. // where it sends index information to connected peers and responds to requests
  66. // for file data without altering the local repository in any way.
  67. func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string, db *leveldb.DB) *Model {
  68. m := &Model{
  69. indexDir: indexDir,
  70. cfg: cfg,
  71. db: db,
  72. clientName: clientName,
  73. clientVersion: clientVersion,
  74. repoCfgs: make(map[string]config.RepositoryConfiguration),
  75. repoFiles: make(map[string]*files.Set),
  76. repoNodes: make(map[string][]protocol.NodeID),
  77. nodeRepos: make(map[protocol.NodeID][]string),
  78. repoState: make(map[string]repoState),
  79. suppressor: make(map[string]*suppressor),
  80. protoConn: make(map[protocol.NodeID]protocol.Connection),
  81. rawConn: make(map[protocol.NodeID]io.Closer),
  82. nodeVer: make(map[protocol.NodeID]string),
  83. sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
  84. }
  85. var timeout = 20 * 60 // seconds
  86. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  87. it, err := strconv.Atoi(t)
  88. if err == nil {
  89. timeout = it
  90. }
  91. }
  92. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  93. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  94. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  95. go m.broadcastIndexLoop()
  96. return m
  97. }
  98. // StartRW starts read/write processing on the current model. When in
  99. // read/write mode the model will attempt to keep in sync with the cluster by
  100. // pulling needed files from peer nodes.
  101. func (m *Model) StartRepoRW(repo string, threads int) {
  102. m.rmut.RLock()
  103. defer m.rmut.RUnlock()
  104. if cfg, ok := m.repoCfgs[repo]; !ok {
  105. panic("cannot start without repo")
  106. } else {
  107. newPuller(cfg, m, threads, m.cfg)
  108. }
  109. }
  110. // StartRO starts read only processing on the current model. When in
  111. // read only mode the model will announce files to the cluster but not
  112. // pull in any external changes.
  113. func (m *Model) StartRepoRO(repo string) {
  114. m.StartRepoRW(repo, 0) // zero threads => read only
  115. }
  116. type ConnectionInfo struct {
  117. protocol.Statistics
  118. Address string
  119. ClientVersion string
  120. Completion int
  121. }
  122. // ConnectionStats returns a map with connection statistics for each connected node.
  123. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  124. type remoteAddrer interface {
  125. RemoteAddr() net.Addr
  126. }
  127. m.pmut.RLock()
  128. m.rmut.RLock()
  129. var res = make(map[string]ConnectionInfo)
  130. for node, conn := range m.protoConn {
  131. ci := ConnectionInfo{
  132. Statistics: conn.Statistics(),
  133. ClientVersion: m.nodeVer[node],
  134. }
  135. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  136. ci.Address = nc.RemoteAddr().String()
  137. }
  138. var tot int64
  139. var have int64
  140. for _, repo := range m.nodeRepos[node] {
  141. m.repoFiles[repo].WithGlobal(func(f protocol.FileInfo) bool {
  142. if !protocol.IsDeleted(f.Flags) {
  143. var size int64
  144. if protocol.IsDirectory(f.Flags) {
  145. size = zeroEntrySize
  146. } else {
  147. size = f.Size()
  148. }
  149. tot += size
  150. have += size
  151. }
  152. return true
  153. })
  154. m.repoFiles[repo].WithNeed(node, func(f protocol.FileInfo) bool {
  155. if !protocol.IsDeleted(f.Flags) {
  156. var size int64
  157. if protocol.IsDirectory(f.Flags) {
  158. size = zeroEntrySize
  159. } else {
  160. size = f.Size()
  161. }
  162. have -= size
  163. }
  164. return true
  165. })
  166. }
  167. ci.Completion = 100
  168. if tot != 0 {
  169. ci.Completion = int(100 * have / tot)
  170. }
  171. res[node.String()] = ci
  172. }
  173. m.rmut.RUnlock()
  174. m.pmut.RUnlock()
  175. in, out := protocol.TotalInOut()
  176. res["total"] = ConnectionInfo{
  177. Statistics: protocol.Statistics{
  178. At: time.Now(),
  179. InBytesTotal: in,
  180. OutBytesTotal: out,
  181. },
  182. }
  183. return res
  184. }
  185. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  186. for _, f := range fs {
  187. fs, de, by := sizeOfFile(f)
  188. files += fs
  189. deleted += de
  190. bytes += by
  191. }
  192. return
  193. }
  194. func sizeOfFile(f protocol.FileInfo) (files, deleted int, bytes int64) {
  195. if !protocol.IsDeleted(f.Flags) {
  196. files++
  197. if !protocol.IsDirectory(f.Flags) {
  198. bytes += f.Size()
  199. } else {
  200. bytes += zeroEntrySize
  201. }
  202. } else {
  203. deleted++
  204. bytes += zeroEntrySize
  205. }
  206. return
  207. }
  208. // GlobalSize returns the number of files, deleted files and total bytes for all
  209. // files in the global model.
  210. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  211. m.rmut.RLock()
  212. defer m.rmut.RUnlock()
  213. if rf, ok := m.repoFiles[repo]; ok {
  214. rf.WithGlobal(func(f protocol.FileInfo) bool {
  215. fs, de, by := sizeOfFile(f)
  216. files += fs
  217. deleted += de
  218. bytes += by
  219. return true
  220. })
  221. }
  222. return
  223. }
  224. // LocalSize returns the number of files, deleted files and total bytes for all
  225. // files in the local repository.
  226. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  227. m.rmut.RLock()
  228. defer m.rmut.RUnlock()
  229. if rf, ok := m.repoFiles[repo]; ok {
  230. rf.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  231. fs, de, by := sizeOfFile(f)
  232. files += fs
  233. deleted += de
  234. bytes += by
  235. return true
  236. })
  237. }
  238. return
  239. }
  240. // NeedSize returns the number and total size of currently needed files.
  241. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  242. f, d, b := sizeOf(m.NeedFilesRepo(repo))
  243. return f + d, b
  244. }
  245. // NeedFiles returns the list of currently needed files
  246. func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
  247. m.rmut.RLock()
  248. defer m.rmut.RUnlock()
  249. if rf, ok := m.repoFiles[repo]; ok {
  250. var fs []protocol.FileInfo
  251. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  252. fs = append(fs, f)
  253. return true
  254. })
  255. if r := m.repoCfgs[repo].FileRanker(); r != nil {
  256. files.SortBy(r).Sort(fs)
  257. }
  258. return fs
  259. }
  260. return nil
  261. }
  262. // Index is called when a new node is connected and we receive their full index.
  263. // Implements the protocol.Model interface.
  264. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  265. if debug {
  266. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  267. }
  268. if !m.repoSharedWith(repo, nodeID) {
  269. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  270. return
  271. }
  272. m.rmut.RLock()
  273. if r, ok := m.repoFiles[repo]; ok {
  274. r.Replace(nodeID, fs)
  275. } else {
  276. l.Fatalf("Index for nonexistant repo %q", repo)
  277. }
  278. m.rmut.RUnlock()
  279. events.Default.Log(events.RemoteIndexUpdated, map[string]string{
  280. "node": nodeID.String(),
  281. "repo": repo,
  282. })
  283. }
  284. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  285. // Implements the protocol.Model interface.
  286. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  287. if debug {
  288. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  289. }
  290. if !m.repoSharedWith(repo, nodeID) {
  291. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  292. return
  293. }
  294. m.rmut.RLock()
  295. if r, ok := m.repoFiles[repo]; ok {
  296. r.Update(nodeID, fs)
  297. } else {
  298. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  299. }
  300. m.rmut.RUnlock()
  301. events.Default.Log(events.RemoteIndexUpdated, map[string]string{
  302. "node": nodeID.String(),
  303. "repo": repo,
  304. })
  305. }
  306. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  307. m.rmut.RLock()
  308. defer m.rmut.RUnlock()
  309. for _, nrepo := range m.nodeRepos[nodeID] {
  310. if nrepo == repo {
  311. return true
  312. }
  313. }
  314. return false
  315. }
  316. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  317. compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
  318. if debug {
  319. l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
  320. l.Debugf(" ... compare: %s: %v", nodeID, compErr)
  321. }
  322. if compErr != nil {
  323. l.Warnf("%s: %v", nodeID, compErr)
  324. m.Close(nodeID, compErr)
  325. }
  326. m.pmut.Lock()
  327. if config.ClientName == "syncthing" {
  328. m.nodeVer[nodeID] = config.ClientVersion
  329. } else {
  330. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  331. }
  332. m.pmut.Unlock()
  333. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  334. }
  335. // Close removes the peer from the model and closes the underlying connection if possible.
  336. // Implements the protocol.Model interface.
  337. func (m *Model) Close(node protocol.NodeID, err error) {
  338. l.Infof("Connection to %s closed: %v", node, err)
  339. events.Default.Log(events.NodeDisconnected, map[string]string{
  340. "id": node.String(),
  341. "error": err.Error(),
  342. })
  343. m.rmut.RLock()
  344. for _, repo := range m.nodeRepos[node] {
  345. m.repoFiles[repo].Replace(node, nil)
  346. }
  347. m.rmut.RUnlock()
  348. m.pmut.Lock()
  349. conn, ok := m.rawConn[node]
  350. if ok {
  351. conn.Close()
  352. }
  353. delete(m.protoConn, node)
  354. delete(m.rawConn, node)
  355. delete(m.nodeVer, node)
  356. m.pmut.Unlock()
  357. }
  358. // Request returns the specified data segment by reading it from local disk.
  359. // Implements the protocol.Model interface.
  360. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  361. // Verify that the requested file exists in the local model.
  362. m.rmut.RLock()
  363. r, ok := m.repoFiles[repo]
  364. m.rmut.RUnlock()
  365. if !ok {
  366. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  367. return nil, ErrNoSuchFile
  368. }
  369. lf := r.Get(protocol.LocalNodeID, name)
  370. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  371. if debug {
  372. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  373. }
  374. return nil, ErrInvalid
  375. }
  376. if offset > lf.Size() {
  377. if debug {
  378. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  379. }
  380. return nil, ErrNoSuchFile
  381. }
  382. if debug && nodeID != protocol.LocalNodeID {
  383. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  384. }
  385. m.rmut.RLock()
  386. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  387. m.rmut.RUnlock()
  388. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  389. if err != nil {
  390. return nil, err
  391. }
  392. defer fd.Close()
  393. buf := make([]byte, size)
  394. _, err = fd.ReadAt(buf, offset)
  395. if err != nil {
  396. return nil, err
  397. }
  398. return buf, nil
  399. }
  400. // ReplaceLocal replaces the local repository index with the given list of files.
  401. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  402. m.rmut.RLock()
  403. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  404. m.rmut.RUnlock()
  405. }
  406. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  407. m.rmut.RLock()
  408. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  409. m.rmut.RUnlock()
  410. return f
  411. }
  412. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  413. m.rmut.RLock()
  414. f := m.repoFiles[repo].GetGlobal(file)
  415. m.rmut.RUnlock()
  416. return f
  417. }
  418. type cFiler struct {
  419. m *Model
  420. r string
  421. }
  422. // Implements scanner.CurrentFiler
  423. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  424. return cf.m.CurrentRepoFile(cf.r, file)
  425. }
  426. // ConnectedTo returns true if we are connected to the named node.
  427. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  428. m.pmut.RLock()
  429. _, ok := m.protoConn[nodeID]
  430. m.pmut.RUnlock()
  431. return ok
  432. }
  433. // AddConnection adds a new peer connection to the model. An initial index will
  434. // be sent to the connected peer, thereafter index updates whenever the local
  435. // repository changes.
  436. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  437. nodeID := protoConn.ID()
  438. m.pmut.Lock()
  439. if _, ok := m.protoConn[nodeID]; ok {
  440. panic("add existing node")
  441. }
  442. m.protoConn[nodeID] = protoConn
  443. if _, ok := m.rawConn[nodeID]; ok {
  444. panic("add existing node")
  445. }
  446. m.rawConn[nodeID] = rawConn
  447. m.pmut.Unlock()
  448. cm := m.clusterConfig(nodeID)
  449. protoConn.ClusterConfig(cm)
  450. var idxToSend = make(map[string][]protocol.FileInfo)
  451. m.rmut.RLock()
  452. for _, repo := range m.nodeRepos[nodeID] {
  453. idxToSend[repo] = m.protocolIndex(repo)
  454. }
  455. m.rmut.RUnlock()
  456. go func() {
  457. for repo, idx := range idxToSend {
  458. if debug {
  459. l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
  460. }
  461. const batchSize = 1000
  462. for i := 0; i < len(idx); i += batchSize {
  463. if len(idx[i:]) < batchSize {
  464. protoConn.Index(repo, idx[i:])
  465. } else {
  466. protoConn.Index(repo, idx[i:i+batchSize])
  467. }
  468. }
  469. }
  470. }()
  471. }
  472. // protocolIndex returns the current local index in protocol data types.
  473. func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
  474. var fs []protocol.FileInfo
  475. m.repoFiles[repo].WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  476. fs = append(fs, f)
  477. return true
  478. })
  479. return fs
  480. }
  481. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  482. m.rmut.RLock()
  483. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  484. m.rmut.RUnlock()
  485. events.Default.Log(events.LocalIndexUpdated, map[string]string{"repo": repo})
  486. }
  487. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  488. m.pmut.RLock()
  489. nc, ok := m.protoConn[nodeID]
  490. m.pmut.RUnlock()
  491. if !ok {
  492. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  493. }
  494. if debug {
  495. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  496. }
  497. return nc.Request(repo, name, offset, size)
  498. }
  499. func (m *Model) broadcastIndexLoop() {
  500. // TODO: Rewrite to send index in segments
  501. var lastChange = map[string]uint64{}
  502. for {
  503. time.Sleep(5 * time.Second)
  504. m.pmut.RLock()
  505. m.rmut.RLock()
  506. var indexWg sync.WaitGroup
  507. for repo, fs := range m.repoFiles {
  508. repo := repo
  509. c := fs.Changes(protocol.LocalNodeID)
  510. if c == lastChange[repo] {
  511. continue
  512. }
  513. lastChange[repo] = c
  514. idx := m.protocolIndex(repo)
  515. for _, nodeID := range m.repoNodes[repo] {
  516. nodeID := nodeID
  517. if conn, ok := m.protoConn[nodeID]; ok {
  518. indexWg.Add(1)
  519. if debug {
  520. l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
  521. }
  522. go func() {
  523. conn.Index(repo, idx)
  524. indexWg.Done()
  525. }()
  526. }
  527. }
  528. }
  529. m.rmut.RUnlock()
  530. m.pmut.RUnlock()
  531. indexWg.Wait()
  532. }
  533. }
  534. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  535. if m.started {
  536. panic("cannot add repo to started model")
  537. }
  538. if len(cfg.ID) == 0 {
  539. panic("cannot add empty repo id")
  540. }
  541. m.rmut.Lock()
  542. m.repoCfgs[cfg.ID] = cfg
  543. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  544. m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
  545. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  546. for i, node := range cfg.Nodes {
  547. m.repoNodes[cfg.ID][i] = node.NodeID
  548. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  549. }
  550. m.addedRepo = true
  551. m.rmut.Unlock()
  552. }
  553. func (m *Model) ScanRepos() {
  554. m.rmut.RLock()
  555. var repos = make([]string, 0, len(m.repoCfgs))
  556. for repo := range m.repoCfgs {
  557. repos = append(repos, repo)
  558. }
  559. m.rmut.RUnlock()
  560. var wg sync.WaitGroup
  561. wg.Add(len(repos))
  562. for _, repo := range repos {
  563. repo := repo
  564. go func() {
  565. err := m.ScanRepo(repo)
  566. if err != nil {
  567. invalidateRepo(m.cfg, repo, err)
  568. }
  569. wg.Done()
  570. }()
  571. }
  572. wg.Wait()
  573. }
  574. func (m *Model) CleanRepos() {
  575. m.rmut.RLock()
  576. var dirs = make([]string, 0, len(m.repoCfgs))
  577. for _, cfg := range m.repoCfgs {
  578. dirs = append(dirs, cfg.Directory)
  579. }
  580. m.rmut.RUnlock()
  581. var wg sync.WaitGroup
  582. wg.Add(len(dirs))
  583. for _, dir := range dirs {
  584. w := &scanner.Walker{
  585. Dir: dir,
  586. TempNamer: defTempNamer,
  587. }
  588. go func() {
  589. w.CleanTempFiles()
  590. wg.Done()
  591. }()
  592. }
  593. wg.Wait()
  594. }
  595. func (m *Model) ScanRepo(repo string) error {
  596. m.rmut.RLock()
  597. w := &scanner.Walker{
  598. Dir: m.repoCfgs[repo].Directory,
  599. IgnoreFile: ".stignore",
  600. BlockSize: scanner.StandardBlockSize,
  601. TempNamer: defTempNamer,
  602. Suppressor: m.suppressor[repo],
  603. CurrentFiler: cFiler{m, repo},
  604. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  605. }
  606. m.rmut.RUnlock()
  607. m.setState(repo, RepoScanning)
  608. fs, _, err := w.Walk()
  609. if err != nil {
  610. return err
  611. }
  612. m.ReplaceLocal(repo, fs)
  613. m.setState(repo, RepoIdle)
  614. return nil
  615. }
  616. func (m *Model) LoadIndexes(dir string) {
  617. m.rmut.RLock()
  618. for repo := range m.repoCfgs {
  619. fs := m.loadIndex(repo, dir)
  620. var sfs = make([]protocol.FileInfo, len(fs))
  621. for i := 0; i < len(fs); i++ {
  622. lamport.Default.Tick(fs[i].Version)
  623. fs[i].Flags &= ^uint32(protocol.FlagInvalid) // we might have saved an index with files that were suppressed; the should not be on startup
  624. }
  625. m.repoFiles[repo].Replace(protocol.LocalNodeID, sfs)
  626. }
  627. m.rmut.RUnlock()
  628. }
  629. func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) error {
  630. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  631. name := id + ".idx.gz"
  632. name = filepath.Join(dir, name)
  633. tmp := fmt.Sprintf("%s.tmp.%d", name, time.Now().UnixNano())
  634. idxf, err := os.OpenFile(tmp, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0644)
  635. if err != nil {
  636. return err
  637. }
  638. defer os.Remove(tmp)
  639. gzw := gzip.NewWriter(idxf)
  640. n, err := protocol.IndexMessage{
  641. Repository: repo,
  642. Files: fs,
  643. }.EncodeXDR(gzw)
  644. if err != nil {
  645. gzw.Close()
  646. idxf.Close()
  647. return err
  648. }
  649. err = gzw.Close()
  650. if err != nil {
  651. return err
  652. }
  653. err = idxf.Close()
  654. if err != nil {
  655. return err
  656. }
  657. if debug {
  658. l.Debugln("wrote index,", n, "bytes uncompressed")
  659. }
  660. return osutil.Rename(tmp, name)
  661. }
  662. func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
  663. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  664. name := id + ".idx.gz"
  665. name = filepath.Join(dir, name)
  666. idxf, err := os.Open(name)
  667. if err != nil {
  668. return nil
  669. }
  670. defer idxf.Close()
  671. gzr, err := gzip.NewReader(idxf)
  672. if err != nil {
  673. return nil
  674. }
  675. defer gzr.Close()
  676. var im protocol.IndexMessage
  677. err = im.DecodeXDR(gzr)
  678. if err != nil || im.Repository != repo {
  679. return nil
  680. }
  681. return im.Files
  682. }
  683. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  684. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  685. cm := protocol.ClusterConfigMessage{
  686. ClientName: m.clientName,
  687. ClientVersion: m.clientVersion,
  688. }
  689. m.rmut.RLock()
  690. for _, repo := range m.nodeRepos[node] {
  691. cr := protocol.Repository{
  692. ID: repo,
  693. }
  694. for _, node := range m.repoNodes[repo] {
  695. // TODO: Set read only bit when relevant
  696. cr.Nodes = append(cr.Nodes, protocol.Node{
  697. ID: node[:],
  698. Flags: protocol.FlagShareTrusted,
  699. })
  700. }
  701. cm.Repositories = append(cm.Repositories, cr)
  702. }
  703. m.rmut.RUnlock()
  704. return cm
  705. }
  706. func (m *Model) setState(repo string, state repoState) {
  707. m.smut.Lock()
  708. m.repoState[repo] = state
  709. m.smut.Unlock()
  710. }
  711. func (m *Model) State(repo string) string {
  712. m.smut.RLock()
  713. state := m.repoState[repo]
  714. m.smut.RUnlock()
  715. switch state {
  716. case RepoIdle:
  717. return "idle"
  718. case RepoScanning:
  719. return "scanning"
  720. case RepoCleaning:
  721. return "cleaning"
  722. case RepoSyncing:
  723. return "syncing"
  724. default:
  725. return "unknown"
  726. }
  727. }
  728. func (m *Model) Override(repo string) {
  729. fs := m.NeedFilesRepo(repo)
  730. m.rmut.RLock()
  731. r := m.repoFiles[repo]
  732. m.rmut.RUnlock()
  733. for i := range fs {
  734. f := &fs[i]
  735. h := r.Get(protocol.LocalNodeID, f.Name)
  736. if h.Name != f.Name {
  737. // We are missing the file
  738. f.Flags |= protocol.FlagDeleted
  739. f.Blocks = nil
  740. } else {
  741. // We have the file, replace with our version
  742. *f = h
  743. }
  744. f.Version = lamport.Default.Tick(f.Version)
  745. }
  746. r.Update(protocol.LocalNodeID, fs)
  747. }
  748. // Version returns the change version for the given repository. This is
  749. // guaranteed to increment if the contents of the local or global repository
  750. // has changed.
  751. func (m *Model) Version(repo string) uint64 {
  752. var ver uint64
  753. m.rmut.Lock()
  754. for _, n := range m.repoNodes[repo] {
  755. ver += m.repoFiles[repo].Changes(n)
  756. }
  757. m.rmut.Unlock()
  758. return ver
  759. }