model.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894
  1. // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
  2. // Use of this source code is governed by an MIT-style license that can be
  3. // found in the LICENSE file.
  4. package model
  5. import (
  6. "compress/gzip"
  7. "crypto/sha1"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "os"
  13. "path/filepath"
  14. "strconv"
  15. "sync"
  16. "time"
  17. "github.com/calmh/syncthing/cid"
  18. "github.com/calmh/syncthing/config"
  19. "github.com/calmh/syncthing/files"
  20. "github.com/calmh/syncthing/lamport"
  21. "github.com/calmh/syncthing/osutil"
  22. "github.com/calmh/syncthing/protocol"
  23. "github.com/calmh/syncthing/scanner"
  24. )
  25. type repoState int
  26. const (
  27. RepoIdle repoState = iota
  28. RepoScanning
  29. RepoSyncing
  30. RepoCleaning
  31. )
  32. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  33. // of an unsynchronized directory entry or a deleted file. We need it to be
  34. // larger than zero so that it's visible that there is some amount of bytes to
  35. // transfer to bring the systems into synchronization.
  36. const zeroEntrySize = 128
  37. type Model struct {
  38. indexDir string
  39. cfg *config.Configuration
  40. clientName string
  41. clientVersion string
  42. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  43. repoFiles map[string]*files.Set // repo -> files
  44. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  45. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  46. suppressor map[string]*suppressor // repo -> suppressor
  47. rmut sync.RWMutex // protects the above
  48. repoState map[string]repoState // repo -> state
  49. smut sync.RWMutex
  50. cm *cid.Map
  51. protoConn map[protocol.NodeID]protocol.Connection
  52. rawConn map[protocol.NodeID]io.Closer
  53. nodeVer map[protocol.NodeID]string
  54. pmut sync.RWMutex // protects protoConn and rawConn
  55. sup suppressor
  56. addedRepo bool
  57. started bool
  58. }
  59. var (
  60. ErrNoSuchFile = errors.New("no such file")
  61. ErrInvalid = errors.New("file is invalid")
  62. )
  63. // NewModel creates and starts a new model. The model starts in read-only mode,
  64. // where it sends index information to connected peers and responds to requests
  65. // for file data without altering the local repository in any way.
  66. func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string) *Model {
  67. m := &Model{
  68. indexDir: indexDir,
  69. cfg: cfg,
  70. clientName: clientName,
  71. clientVersion: clientVersion,
  72. repoCfgs: make(map[string]config.RepositoryConfiguration),
  73. repoFiles: make(map[string]*files.Set),
  74. repoNodes: make(map[string][]protocol.NodeID),
  75. nodeRepos: make(map[protocol.NodeID][]string),
  76. repoState: make(map[string]repoState),
  77. suppressor: make(map[string]*suppressor),
  78. cm: cid.NewMap(),
  79. protoConn: make(map[protocol.NodeID]protocol.Connection),
  80. rawConn: make(map[protocol.NodeID]io.Closer),
  81. nodeVer: make(map[protocol.NodeID]string),
  82. sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
  83. }
  84. var timeout = 20 * 60 // seconds
  85. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  86. it, err := strconv.Atoi(t)
  87. if err == nil {
  88. timeout = it
  89. }
  90. }
  91. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  92. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  93. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  94. go m.broadcastIndexLoop()
  95. return m
  96. }
  97. // StartRW starts read/write processing on the current model. When in
  98. // read/write mode the model will attempt to keep in sync with the cluster by
  99. // pulling needed files from peer nodes.
  100. func (m *Model) StartRepoRW(repo string, threads int) {
  101. m.rmut.RLock()
  102. defer m.rmut.RUnlock()
  103. if cfg, ok := m.repoCfgs[repo]; !ok {
  104. panic("cannot start without repo")
  105. } else {
  106. newPuller(cfg, m, threads, m.cfg)
  107. }
  108. }
  109. // StartRO starts read only processing on the current model. When in
  110. // read only mode the model will announce files to the cluster but not
  111. // pull in any external changes.
  112. func (m *Model) StartRepoRO(repo string) {
  113. m.StartRepoRW(repo, 0) // zero threads => read only
  114. }
  115. type ConnectionInfo struct {
  116. protocol.Statistics
  117. Address string
  118. ClientVersion string
  119. Completion int
  120. }
  121. // ConnectionStats returns a map with connection statistics for each connected node.
  122. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  123. type remoteAddrer interface {
  124. RemoteAddr() net.Addr
  125. }
  126. m.pmut.RLock()
  127. m.rmut.RLock()
  128. var res = make(map[string]ConnectionInfo)
  129. for node, conn := range m.protoConn {
  130. ci := ConnectionInfo{
  131. Statistics: conn.Statistics(),
  132. ClientVersion: m.nodeVer[node],
  133. }
  134. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  135. ci.Address = nc.RemoteAddr().String()
  136. }
  137. var tot int64
  138. var have int64
  139. for _, repo := range m.nodeRepos[node] {
  140. for _, f := range m.repoFiles[repo].Global() {
  141. if !protocol.IsDeleted(f.Flags) {
  142. size := f.Size
  143. if protocol.IsDirectory(f.Flags) {
  144. size = zeroEntrySize
  145. }
  146. tot += size
  147. have += size
  148. }
  149. }
  150. for _, f := range m.repoFiles[repo].Need(m.cm.Get(node)) {
  151. if !protocol.IsDeleted(f.Flags) {
  152. size := f.Size
  153. if protocol.IsDirectory(f.Flags) {
  154. size = zeroEntrySize
  155. }
  156. have -= size
  157. }
  158. }
  159. }
  160. ci.Completion = 100
  161. if tot != 0 {
  162. ci.Completion = int(100 * have / tot)
  163. }
  164. res[node.String()] = ci
  165. }
  166. m.rmut.RUnlock()
  167. m.pmut.RUnlock()
  168. in, out := protocol.TotalInOut()
  169. res["total"] = ConnectionInfo{
  170. Statistics: protocol.Statistics{
  171. At: time.Now(),
  172. InBytesTotal: in,
  173. OutBytesTotal: out,
  174. },
  175. }
  176. return res
  177. }
  178. func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) {
  179. for _, f := range fs {
  180. if !protocol.IsDeleted(f.Flags) {
  181. files++
  182. if !protocol.IsDirectory(f.Flags) {
  183. bytes += f.Size
  184. } else {
  185. bytes += zeroEntrySize
  186. }
  187. } else {
  188. deleted++
  189. bytes += zeroEntrySize
  190. }
  191. }
  192. return
  193. }
  194. // GlobalSize returns the number of files, deleted files and total bytes for all
  195. // files in the global model.
  196. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  197. m.rmut.RLock()
  198. defer m.rmut.RUnlock()
  199. if rf, ok := m.repoFiles[repo]; ok {
  200. return sizeOf(rf.Global())
  201. }
  202. return 0, 0, 0
  203. }
  204. // LocalSize returns the number of files, deleted files and total bytes for all
  205. // files in the local repository.
  206. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  207. m.rmut.RLock()
  208. defer m.rmut.RUnlock()
  209. if rf, ok := m.repoFiles[repo]; ok {
  210. return sizeOf(rf.Have(cid.LocalID))
  211. }
  212. return 0, 0, 0
  213. }
  214. // NeedSize returns the number and total size of currently needed files.
  215. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  216. f, d, b := sizeOf(m.NeedFilesRepo(repo))
  217. return f + d, b
  218. }
  219. // NeedFiles returns the list of currently needed files and the total size.
  220. func (m *Model) NeedFilesRepo(repo string) []scanner.File {
  221. m.rmut.RLock()
  222. defer m.rmut.RUnlock()
  223. if rf, ok := m.repoFiles[repo]; ok {
  224. f := rf.Need(cid.LocalID)
  225. if r := m.repoCfgs[repo].FileRanker(); r != nil {
  226. files.SortBy(r).Sort(f)
  227. }
  228. return f
  229. }
  230. return nil
  231. }
  232. // Index is called when a new node is connected and we receive their full index.
  233. // Implements the protocol.Model interface.
  234. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  235. if debug {
  236. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  237. }
  238. if !m.repoSharedWith(repo, nodeID) {
  239. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  240. return
  241. }
  242. var files = make([]scanner.File, len(fs))
  243. for i := range fs {
  244. f := fs[i]
  245. lamport.Default.Tick(f.Version)
  246. if debug {
  247. var flagComment string
  248. if protocol.IsDeleted(f.Flags) {
  249. flagComment = " (deleted)"
  250. }
  251. l.Debugf("IDX(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  252. }
  253. files[i] = fileFromFileInfo(f)
  254. }
  255. id := m.cm.Get(nodeID)
  256. m.rmut.RLock()
  257. if r, ok := m.repoFiles[repo]; ok {
  258. r.Replace(id, files)
  259. } else {
  260. l.Fatalf("Index for nonexistant repo %q", repo)
  261. }
  262. m.rmut.RUnlock()
  263. }
  264. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  265. // Implements the protocol.Model interface.
  266. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  267. if debug {
  268. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  269. }
  270. if !m.repoSharedWith(repo, nodeID) {
  271. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  272. return
  273. }
  274. var files = make([]scanner.File, len(fs))
  275. for i := range fs {
  276. f := fs[i]
  277. lamport.Default.Tick(f.Version)
  278. if debug {
  279. var flagComment string
  280. if protocol.IsDeleted(f.Flags) {
  281. flagComment = " (deleted)"
  282. }
  283. l.Debugf("IDXUP(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  284. }
  285. files[i] = fileFromFileInfo(f)
  286. }
  287. id := m.cm.Get(nodeID)
  288. m.rmut.RLock()
  289. if r, ok := m.repoFiles[repo]; ok {
  290. r.Update(id, files)
  291. } else {
  292. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  293. }
  294. m.rmut.RUnlock()
  295. }
  296. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  297. m.rmut.RLock()
  298. defer m.rmut.RUnlock()
  299. for _, nrepo := range m.nodeRepos[nodeID] {
  300. if nrepo == repo {
  301. return true
  302. }
  303. }
  304. return false
  305. }
  306. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  307. compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
  308. if debug {
  309. l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
  310. l.Debugf(" ... compare: %s: %v", nodeID, compErr)
  311. }
  312. if compErr != nil {
  313. l.Warnf("%s: %v", nodeID, compErr)
  314. m.Close(nodeID, compErr)
  315. }
  316. m.pmut.Lock()
  317. if config.ClientName == "syncthing" {
  318. m.nodeVer[nodeID] = config.ClientVersion
  319. } else {
  320. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  321. }
  322. m.pmut.Unlock()
  323. }
  324. // Close removes the peer from the model and closes the underlying connection if possible.
  325. // Implements the protocol.Model interface.
  326. func (m *Model) Close(node protocol.NodeID, err error) {
  327. l.Infof("Connection to %s closed: %v", node, err)
  328. cid := m.cm.Get(node)
  329. m.rmut.RLock()
  330. for _, repo := range m.nodeRepos[node] {
  331. m.repoFiles[repo].Replace(cid, nil)
  332. }
  333. m.rmut.RUnlock()
  334. m.cm.Clear(node)
  335. m.pmut.Lock()
  336. conn, ok := m.rawConn[node]
  337. if ok {
  338. conn.Close()
  339. }
  340. delete(m.protoConn, node)
  341. delete(m.rawConn, node)
  342. delete(m.nodeVer, node)
  343. m.pmut.Unlock()
  344. }
  345. // Request returns the specified data segment by reading it from local disk.
  346. // Implements the protocol.Model interface.
  347. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  348. // Verify that the requested file exists in the local model.
  349. m.rmut.RLock()
  350. r, ok := m.repoFiles[repo]
  351. m.rmut.RUnlock()
  352. if !ok {
  353. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  354. return nil, ErrNoSuchFile
  355. }
  356. lf := r.Get(cid.LocalID, name)
  357. if lf.Suppressed || protocol.IsDeleted(lf.Flags) {
  358. if debug {
  359. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  360. }
  361. return nil, ErrInvalid
  362. }
  363. if offset > lf.Size {
  364. if debug {
  365. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  366. }
  367. return nil, ErrNoSuchFile
  368. }
  369. if debug && nodeID != cid.LocalNodeID {
  370. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  371. }
  372. m.rmut.RLock()
  373. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  374. m.rmut.RUnlock()
  375. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  376. if err != nil {
  377. return nil, err
  378. }
  379. defer fd.Close()
  380. buf := make([]byte, size)
  381. _, err = fd.ReadAt(buf, offset)
  382. if err != nil {
  383. return nil, err
  384. }
  385. return buf, nil
  386. }
  387. // ReplaceLocal replaces the local repository index with the given list of files.
  388. func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
  389. m.rmut.RLock()
  390. m.repoFiles[repo].ReplaceWithDelete(cid.LocalID, fs)
  391. m.rmut.RUnlock()
  392. }
  393. func (m *Model) SeedLocal(repo string, fs []protocol.FileInfo) {
  394. var sfs = make([]scanner.File, len(fs))
  395. for i := 0; i < len(fs); i++ {
  396. lamport.Default.Tick(fs[i].Version)
  397. sfs[i] = fileFromFileInfo(fs[i])
  398. sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
  399. }
  400. m.rmut.RLock()
  401. m.repoFiles[repo].Replace(cid.LocalID, sfs)
  402. m.rmut.RUnlock()
  403. }
  404. func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
  405. m.rmut.RLock()
  406. f := m.repoFiles[repo].Get(cid.LocalID, file)
  407. m.rmut.RUnlock()
  408. return f
  409. }
  410. func (m *Model) CurrentGlobalFile(repo string, file string) scanner.File {
  411. m.rmut.RLock()
  412. f := m.repoFiles[repo].GetGlobal(file)
  413. m.rmut.RUnlock()
  414. return f
  415. }
  416. type cFiler struct {
  417. m *Model
  418. r string
  419. }
  420. // Implements scanner.CurrentFiler
  421. func (cf cFiler) CurrentFile(file string) scanner.File {
  422. return cf.m.CurrentRepoFile(cf.r, file)
  423. }
  424. // ConnectedTo returns true if we are connected to the named node.
  425. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  426. m.pmut.RLock()
  427. _, ok := m.protoConn[nodeID]
  428. m.pmut.RUnlock()
  429. return ok
  430. }
  431. // AddConnection adds a new peer connection to the model. An initial index will
  432. // be sent to the connected peer, thereafter index updates whenever the local
  433. // repository changes.
  434. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  435. nodeID := protoConn.ID()
  436. m.pmut.Lock()
  437. if _, ok := m.protoConn[nodeID]; ok {
  438. panic("add existing node")
  439. }
  440. m.protoConn[nodeID] = protoConn
  441. if _, ok := m.rawConn[nodeID]; ok {
  442. panic("add existing node")
  443. }
  444. m.rawConn[nodeID] = rawConn
  445. m.pmut.Unlock()
  446. cm := m.clusterConfig(nodeID)
  447. protoConn.ClusterConfig(cm)
  448. var idxToSend = make(map[string][]protocol.FileInfo)
  449. m.rmut.RLock()
  450. for _, repo := range m.nodeRepos[nodeID] {
  451. idxToSend[repo] = m.protocolIndex(repo)
  452. }
  453. m.rmut.RUnlock()
  454. go func() {
  455. for repo, idx := range idxToSend {
  456. if debug {
  457. l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
  458. }
  459. protoConn.Index(repo, idx)
  460. }
  461. }()
  462. }
  463. // protocolIndex returns the current local index in protocol data types.
  464. func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
  465. var index []protocol.FileInfo
  466. fs := m.repoFiles[repo].Have(cid.LocalID)
  467. for _, f := range fs {
  468. mf := fileInfoFromFile(f)
  469. if debug {
  470. var flagComment string
  471. if protocol.IsDeleted(mf.Flags) {
  472. flagComment = " (deleted)"
  473. }
  474. l.Debugf("IDX(out): %q/%q m=%d f=%o%s v=%d (%d blocks)", repo, mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  475. }
  476. index = append(index, mf)
  477. }
  478. return index
  479. }
  480. func (m *Model) updateLocal(repo string, f scanner.File) {
  481. m.rmut.RLock()
  482. m.repoFiles[repo].Update(cid.LocalID, []scanner.File{f})
  483. m.rmut.RUnlock()
  484. }
  485. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  486. m.pmut.RLock()
  487. nc, ok := m.protoConn[nodeID]
  488. m.pmut.RUnlock()
  489. if !ok {
  490. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  491. }
  492. if debug {
  493. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  494. }
  495. return nc.Request(repo, name, offset, size)
  496. }
  497. func (m *Model) broadcastIndexLoop() {
  498. var lastChange = map[string]uint64{}
  499. for {
  500. time.Sleep(5 * time.Second)
  501. m.pmut.RLock()
  502. m.rmut.RLock()
  503. var indexWg sync.WaitGroup
  504. for repo, fs := range m.repoFiles {
  505. repo := repo
  506. c := fs.Changes(cid.LocalID)
  507. if c == lastChange[repo] {
  508. continue
  509. }
  510. lastChange[repo] = c
  511. idx := m.protocolIndex(repo)
  512. indexWg.Add(1)
  513. go func() {
  514. err := m.saveIndex(repo, m.indexDir, idx)
  515. if err != nil {
  516. l.Infof("Saving index for %q: %v", repo, err)
  517. }
  518. indexWg.Done()
  519. }()
  520. for _, nodeID := range m.repoNodes[repo] {
  521. nodeID := nodeID
  522. if conn, ok := m.protoConn[nodeID]; ok {
  523. indexWg.Add(1)
  524. if debug {
  525. l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
  526. }
  527. go func() {
  528. conn.Index(repo, idx)
  529. indexWg.Done()
  530. }()
  531. }
  532. }
  533. }
  534. m.rmut.RUnlock()
  535. m.pmut.RUnlock()
  536. indexWg.Wait()
  537. }
  538. }
  539. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  540. if m.started {
  541. panic("cannot add repo to started model")
  542. }
  543. if len(cfg.ID) == 0 {
  544. panic("cannot add empty repo id")
  545. }
  546. m.rmut.Lock()
  547. m.repoCfgs[cfg.ID] = cfg
  548. m.repoFiles[cfg.ID] = files.NewSet()
  549. m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
  550. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  551. for i, node := range cfg.Nodes {
  552. m.repoNodes[cfg.ID][i] = node.NodeID
  553. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  554. }
  555. m.addedRepo = true
  556. m.rmut.Unlock()
  557. }
  558. func (m *Model) ScanRepos() {
  559. m.rmut.RLock()
  560. var repos = make([]string, 0, len(m.repoCfgs))
  561. for repo := range m.repoCfgs {
  562. repos = append(repos, repo)
  563. }
  564. m.rmut.RUnlock()
  565. var wg sync.WaitGroup
  566. wg.Add(len(repos))
  567. for _, repo := range repos {
  568. repo := repo
  569. go func() {
  570. err := m.ScanRepo(repo)
  571. if err != nil {
  572. invalidateRepo(m.cfg, repo, err)
  573. }
  574. wg.Done()
  575. }()
  576. }
  577. wg.Wait()
  578. }
  579. func (m *Model) CleanRepos() {
  580. m.rmut.RLock()
  581. var dirs = make([]string, 0, len(m.repoCfgs))
  582. for _, cfg := range m.repoCfgs {
  583. dirs = append(dirs, cfg.Directory)
  584. }
  585. m.rmut.RUnlock()
  586. var wg sync.WaitGroup
  587. wg.Add(len(dirs))
  588. for _, dir := range dirs {
  589. w := &scanner.Walker{
  590. Dir: dir,
  591. TempNamer: defTempNamer,
  592. }
  593. go func() {
  594. w.CleanTempFiles()
  595. wg.Done()
  596. }()
  597. }
  598. wg.Wait()
  599. }
  600. func (m *Model) ScanRepo(repo string) error {
  601. m.rmut.RLock()
  602. w := &scanner.Walker{
  603. Dir: m.repoCfgs[repo].Directory,
  604. IgnoreFile: ".stignore",
  605. BlockSize: scanner.StandardBlockSize,
  606. TempNamer: defTempNamer,
  607. Suppressor: m.suppressor[repo],
  608. CurrentFiler: cFiler{m, repo},
  609. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  610. }
  611. m.rmut.RUnlock()
  612. m.setState(repo, RepoScanning)
  613. fs, _, err := w.Walk()
  614. if err != nil {
  615. return err
  616. }
  617. m.ReplaceLocal(repo, fs)
  618. m.setState(repo, RepoIdle)
  619. return nil
  620. }
  621. func (m *Model) SaveIndexes(dir string) {
  622. m.rmut.RLock()
  623. for repo := range m.repoCfgs {
  624. fs := m.protocolIndex(repo)
  625. err := m.saveIndex(repo, dir, fs)
  626. if err != nil {
  627. l.Infof("Saving index for %q: %v", repo, err)
  628. }
  629. }
  630. m.rmut.RUnlock()
  631. }
  632. func (m *Model) LoadIndexes(dir string) {
  633. m.rmut.RLock()
  634. for repo := range m.repoCfgs {
  635. fs := m.loadIndex(repo, dir)
  636. m.SeedLocal(repo, fs)
  637. }
  638. m.rmut.RUnlock()
  639. }
  640. func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) error {
  641. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  642. name := id + ".idx.gz"
  643. name = filepath.Join(dir, name)
  644. tmp := fmt.Sprintf("%s.tmp.%d", name, time.Now().UnixNano())
  645. idxf, err := os.OpenFile(tmp, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0644)
  646. if err != nil {
  647. return err
  648. }
  649. defer os.Remove(tmp)
  650. gzw := gzip.NewWriter(idxf)
  651. n, err := protocol.IndexMessage{
  652. Repository: repo,
  653. Files: fs,
  654. }.EncodeXDR(gzw)
  655. if err != nil {
  656. gzw.Close()
  657. idxf.Close()
  658. return err
  659. }
  660. err = gzw.Close()
  661. if err != nil {
  662. return err
  663. }
  664. err = idxf.Close()
  665. if err != nil {
  666. return err
  667. }
  668. if debug {
  669. l.Debugln("wrote index,", n, "bytes uncompressed")
  670. }
  671. return osutil.Rename(tmp, name)
  672. }
  673. func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
  674. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  675. name := id + ".idx.gz"
  676. name = filepath.Join(dir, name)
  677. idxf, err := os.Open(name)
  678. if err != nil {
  679. return nil
  680. }
  681. defer idxf.Close()
  682. gzr, err := gzip.NewReader(idxf)
  683. if err != nil {
  684. return nil
  685. }
  686. defer gzr.Close()
  687. var im protocol.IndexMessage
  688. err = im.DecodeXDR(gzr)
  689. if err != nil || im.Repository != repo {
  690. return nil
  691. }
  692. return im.Files
  693. }
  694. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  695. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  696. cm := protocol.ClusterConfigMessage{
  697. ClientName: m.clientName,
  698. ClientVersion: m.clientVersion,
  699. }
  700. m.rmut.RLock()
  701. for _, repo := range m.nodeRepos[node] {
  702. cr := protocol.Repository{
  703. ID: repo,
  704. }
  705. for _, node := range m.repoNodes[repo] {
  706. // TODO: Set read only bit when relevant
  707. cr.Nodes = append(cr.Nodes, protocol.Node{
  708. ID: node[:],
  709. Flags: protocol.FlagShareTrusted,
  710. })
  711. }
  712. cm.Repositories = append(cm.Repositories, cr)
  713. }
  714. m.rmut.RUnlock()
  715. return cm
  716. }
  717. func (m *Model) setState(repo string, state repoState) {
  718. m.smut.Lock()
  719. m.repoState[repo] = state
  720. m.smut.Unlock()
  721. }
  722. func (m *Model) State(repo string) string {
  723. m.smut.RLock()
  724. state := m.repoState[repo]
  725. m.smut.RUnlock()
  726. switch state {
  727. case RepoIdle:
  728. return "idle"
  729. case RepoScanning:
  730. return "scanning"
  731. case RepoCleaning:
  732. return "cleaning"
  733. case RepoSyncing:
  734. return "syncing"
  735. default:
  736. return "unknown"
  737. }
  738. }
  739. func (m *Model) Override(repo string) {
  740. fs := m.NeedFilesRepo(repo)
  741. m.rmut.RLock()
  742. r := m.repoFiles[repo]
  743. m.rmut.RUnlock()
  744. for i := range fs {
  745. f := &fs[i]
  746. h := r.Get(cid.LocalID, f.Name)
  747. if h.Name != f.Name {
  748. // We are missing the file
  749. f.Flags |= protocol.FlagDeleted
  750. f.Blocks = nil
  751. } else {
  752. // We have the file, replace with our version
  753. *f = h
  754. }
  755. f.Version = lamport.Default.Tick(f.Version)
  756. }
  757. r.Update(cid.LocalID, fs)
  758. }
  759. // Version returns the change version for the given repository. This is
  760. // guaranteed to increment if the contents of the local or global repository
  761. // has changed.
  762. func (m *Model) Version(repo string) uint64 {
  763. var ver uint64
  764. m.rmut.Lock()
  765. for _, n := range m.repoNodes[repo] {
  766. ver += m.repoFiles[repo].Changes(m.cm.Get(n))
  767. }
  768. m.rmut.Unlock()
  769. return ver
  770. }