model.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "sync"
  14. "time"
  15. "github.com/calmh/syncthing/config"
  16. "github.com/calmh/syncthing/events"
  17. "github.com/calmh/syncthing/files"
  18. "github.com/calmh/syncthing/lamport"
  19. "github.com/calmh/syncthing/protocol"
  20. "github.com/calmh/syncthing/scanner"
  21. "github.com/syndtr/goleveldb/leveldb"
  22. )
  23. type repoState int
  24. const (
  25. RepoIdle repoState = iota
  26. RepoScanning
  27. RepoSyncing
  28. RepoCleaning
  29. )
  30. func (s repoState) String() string {
  31. switch s {
  32. case RepoIdle:
  33. return "idle"
  34. case RepoScanning:
  35. return "scanning"
  36. case RepoCleaning:
  37. return "cleaning"
  38. case RepoSyncing:
  39. return "syncing"
  40. default:
  41. return "unknown"
  42. }
  43. }
  44. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  45. // of an unsynchronized directory entry or a deleted file. We need it to be
  46. // larger than zero so that it's visible that there is some amount of bytes to
  47. // transfer to bring the systems into synchronization.
  48. const zeroEntrySize = 128
  49. // How many files to send in each Index/IndexUpdate message.
  50. const indexBatchSize = 1000
  51. type Model struct {
  52. indexDir string
  53. cfg *config.Configuration
  54. db *leveldb.DB
  55. clientName string
  56. clientVersion string
  57. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  58. repoFiles map[string]*files.Set // repo -> files
  59. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  60. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  61. suppressor map[string]*suppressor // repo -> suppressor
  62. rmut sync.RWMutex // protects the above
  63. repoState map[string]repoState // repo -> state
  64. repoStateChanged map[string]time.Time // repo -> time when state changed
  65. smut sync.RWMutex
  66. protoConn map[protocol.NodeID]protocol.Connection
  67. rawConn map[protocol.NodeID]io.Closer
  68. nodeVer map[protocol.NodeID]string
  69. pmut sync.RWMutex // protects protoConn and rawConn
  70. sentLocalVer map[protocol.NodeID]map[string]uint64
  71. slMut sync.Mutex
  72. sup suppressor
  73. addedRepo bool
  74. started bool
  75. }
  76. var (
  77. ErrNoSuchFile = errors.New("no such file")
  78. ErrInvalid = errors.New("file is invalid")
  79. )
  80. // NewModel creates and starts a new model. The model starts in read-only mode,
  81. // where it sends index information to connected peers and responds to requests
  82. // for file data without altering the local repository in any way.
  83. func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string, db *leveldb.DB) *Model {
  84. m := &Model{
  85. indexDir: indexDir,
  86. cfg: cfg,
  87. db: db,
  88. clientName: clientName,
  89. clientVersion: clientVersion,
  90. repoCfgs: make(map[string]config.RepositoryConfiguration),
  91. repoFiles: make(map[string]*files.Set),
  92. repoNodes: make(map[string][]protocol.NodeID),
  93. nodeRepos: make(map[protocol.NodeID][]string),
  94. repoState: make(map[string]repoState),
  95. repoStateChanged: make(map[string]time.Time),
  96. suppressor: make(map[string]*suppressor),
  97. protoConn: make(map[protocol.NodeID]protocol.Connection),
  98. rawConn: make(map[protocol.NodeID]io.Closer),
  99. nodeVer: make(map[protocol.NodeID]string),
  100. sentLocalVer: make(map[protocol.NodeID]map[string]uint64),
  101. sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
  102. }
  103. var timeout = 20 * 60 // seconds
  104. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  105. it, err := strconv.Atoi(t)
  106. if err == nil {
  107. timeout = it
  108. }
  109. }
  110. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  111. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  112. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  113. return m
  114. }
  115. // StartRW starts read/write processing on the current model. When in
  116. // read/write mode the model will attempt to keep in sync with the cluster by
  117. // pulling needed files from peer nodes.
  118. func (m *Model) StartRepoRW(repo string, threads int) {
  119. m.rmut.RLock()
  120. defer m.rmut.RUnlock()
  121. if cfg, ok := m.repoCfgs[repo]; !ok {
  122. panic("cannot start without repo")
  123. } else {
  124. newPuller(cfg, m, threads, m.cfg)
  125. }
  126. }
  127. // StartRO starts read only processing on the current model. When in
  128. // read only mode the model will announce files to the cluster but not
  129. // pull in any external changes.
  130. func (m *Model) StartRepoRO(repo string) {
  131. m.StartRepoRW(repo, 0) // zero threads => read only
  132. }
  133. type ConnectionInfo struct {
  134. protocol.Statistics
  135. Address string
  136. ClientVersion string
  137. }
  138. // ConnectionStats returns a map with connection statistics for each connected node.
  139. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  140. type remoteAddrer interface {
  141. RemoteAddr() net.Addr
  142. }
  143. m.pmut.RLock()
  144. m.rmut.RLock()
  145. var res = make(map[string]ConnectionInfo)
  146. for node, conn := range m.protoConn {
  147. ci := ConnectionInfo{
  148. Statistics: conn.Statistics(),
  149. ClientVersion: m.nodeVer[node],
  150. }
  151. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  152. ci.Address = nc.RemoteAddr().String()
  153. }
  154. res[node.String()] = ci
  155. }
  156. m.rmut.RUnlock()
  157. m.pmut.RUnlock()
  158. in, out := protocol.TotalInOut()
  159. res["total"] = ConnectionInfo{
  160. Statistics: protocol.Statistics{
  161. At: time.Now(),
  162. InBytesTotal: in,
  163. OutBytesTotal: out,
  164. },
  165. }
  166. return res
  167. }
  168. // Returns the completion status, in percent, for the given node and repo.
  169. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  170. var tot int64
  171. m.repoFiles[repo].WithGlobal(func(f protocol.FileInfo) bool {
  172. if !protocol.IsDeleted(f.Flags) {
  173. var size int64
  174. if protocol.IsDirectory(f.Flags) {
  175. size = zeroEntrySize
  176. } else {
  177. size = f.Size()
  178. }
  179. tot += size
  180. }
  181. return true
  182. })
  183. var need int64
  184. m.repoFiles[repo].WithNeed(node, func(f protocol.FileInfo) bool {
  185. if !protocol.IsDeleted(f.Flags) {
  186. var size int64
  187. if protocol.IsDirectory(f.Flags) {
  188. size = zeroEntrySize
  189. } else {
  190. size = f.Size()
  191. }
  192. need += size
  193. }
  194. return true
  195. })
  196. return 100 * (1 - float64(need)/float64(tot))
  197. }
  198. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  199. for _, f := range fs {
  200. fs, de, by := sizeOfFile(f)
  201. files += fs
  202. deleted += de
  203. bytes += by
  204. }
  205. return
  206. }
  207. func sizeOfFile(f protocol.FileInfo) (files, deleted int, bytes int64) {
  208. if !protocol.IsDeleted(f.Flags) {
  209. files++
  210. if !protocol.IsDirectory(f.Flags) {
  211. bytes += f.Size()
  212. } else {
  213. bytes += zeroEntrySize
  214. }
  215. } else {
  216. deleted++
  217. bytes += zeroEntrySize
  218. }
  219. return
  220. }
  221. // GlobalSize returns the number of files, deleted files and total bytes for all
  222. // files in the global model.
  223. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  224. m.rmut.RLock()
  225. defer m.rmut.RUnlock()
  226. if rf, ok := m.repoFiles[repo]; ok {
  227. rf.WithGlobal(func(f protocol.FileInfo) bool {
  228. fs, de, by := sizeOfFile(f)
  229. files += fs
  230. deleted += de
  231. bytes += by
  232. return true
  233. })
  234. }
  235. return
  236. }
  237. // LocalSize returns the number of files, deleted files and total bytes for all
  238. // files in the local repository.
  239. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  240. m.rmut.RLock()
  241. defer m.rmut.RUnlock()
  242. if rf, ok := m.repoFiles[repo]; ok {
  243. rf.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  244. fs, de, by := sizeOfFile(f)
  245. files += fs
  246. deleted += de
  247. bytes += by
  248. return true
  249. })
  250. }
  251. return
  252. }
  253. // NeedSize returns the number and total size of currently needed files.
  254. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  255. m.rmut.RLock()
  256. defer m.rmut.RUnlock()
  257. if rf, ok := m.repoFiles[repo]; ok {
  258. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  259. fs, de, by := sizeOfFile(f)
  260. files += fs + de
  261. bytes += by
  262. return true
  263. })
  264. }
  265. return
  266. }
  267. // NeedFiles returns the list of currently needed files
  268. func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
  269. m.rmut.RLock()
  270. defer m.rmut.RUnlock()
  271. if rf, ok := m.repoFiles[repo]; ok {
  272. fs := make([]protocol.FileInfo, 0, indexBatchSize)
  273. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  274. fs = append(fs, f)
  275. return len(fs) < indexBatchSize
  276. })
  277. return fs
  278. }
  279. return nil
  280. }
  281. // Index is called when a new node is connected and we receive their full index.
  282. // Implements the protocol.Model interface.
  283. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  284. if debug {
  285. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  286. }
  287. if !m.repoSharedWith(repo, nodeID) {
  288. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  289. return
  290. }
  291. for i := range fs {
  292. lamport.Default.Tick(fs[i].Version)
  293. }
  294. m.rmut.RLock()
  295. r, ok := m.repoFiles[repo]
  296. m.rmut.RUnlock()
  297. if ok {
  298. r.Replace(nodeID, fs)
  299. } else {
  300. l.Fatalf("Index for nonexistant repo %q", repo)
  301. }
  302. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  303. "node": nodeID.String(),
  304. "repo": repo,
  305. "items": len(fs),
  306. "version": r.LocalVersion(nodeID),
  307. })
  308. }
  309. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  310. // Implements the protocol.Model interface.
  311. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  312. if debug {
  313. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  314. }
  315. if !m.repoSharedWith(repo, nodeID) {
  316. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  317. return
  318. }
  319. for i := range fs {
  320. lamport.Default.Tick(fs[i].Version)
  321. }
  322. m.rmut.RLock()
  323. r, ok := m.repoFiles[repo]
  324. m.rmut.RUnlock()
  325. if ok {
  326. r.Update(nodeID, fs)
  327. } else {
  328. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  329. }
  330. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  331. "node": nodeID.String(),
  332. "repo": repo,
  333. "items": len(fs),
  334. "version": r.LocalVersion(nodeID),
  335. })
  336. }
  337. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  338. m.rmut.RLock()
  339. defer m.rmut.RUnlock()
  340. for _, nrepo := range m.nodeRepos[nodeID] {
  341. if nrepo == repo {
  342. return true
  343. }
  344. }
  345. return false
  346. }
  347. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  348. compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
  349. if debug {
  350. l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
  351. l.Debugf(" ... compare: %s: %v", nodeID, compErr)
  352. }
  353. if compErr != nil {
  354. l.Warnf("%s: %v", nodeID, compErr)
  355. m.Close(nodeID, compErr)
  356. }
  357. m.pmut.Lock()
  358. if config.ClientName == "syncthing" {
  359. m.nodeVer[nodeID] = config.ClientVersion
  360. } else {
  361. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  362. }
  363. m.pmut.Unlock()
  364. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  365. }
  366. // Close removes the peer from the model and closes the underlying connection if possible.
  367. // Implements the protocol.Model interface.
  368. func (m *Model) Close(node protocol.NodeID, err error) {
  369. l.Infof("Connection to %s closed: %v", node, err)
  370. events.Default.Log(events.NodeDisconnected, map[string]string{
  371. "id": node.String(),
  372. "error": err.Error(),
  373. })
  374. m.pmut.Lock()
  375. m.rmut.RLock()
  376. for _, repo := range m.nodeRepos[node] {
  377. m.repoFiles[repo].Replace(node, nil)
  378. }
  379. m.rmut.RUnlock()
  380. conn, ok := m.rawConn[node]
  381. if ok {
  382. conn.Close()
  383. }
  384. delete(m.protoConn, node)
  385. delete(m.rawConn, node)
  386. delete(m.nodeVer, node)
  387. m.pmut.Unlock()
  388. }
  389. // Request returns the specified data segment by reading it from local disk.
  390. // Implements the protocol.Model interface.
  391. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  392. // Verify that the requested file exists in the local model.
  393. m.rmut.RLock()
  394. r, ok := m.repoFiles[repo]
  395. m.rmut.RUnlock()
  396. if !ok {
  397. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  398. return nil, ErrNoSuchFile
  399. }
  400. lf := r.Get(protocol.LocalNodeID, name)
  401. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  402. if debug {
  403. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  404. }
  405. return nil, ErrInvalid
  406. }
  407. if offset > lf.Size() {
  408. if debug {
  409. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  410. }
  411. return nil, ErrNoSuchFile
  412. }
  413. if debug && nodeID != protocol.LocalNodeID {
  414. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  415. }
  416. m.rmut.RLock()
  417. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  418. m.rmut.RUnlock()
  419. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  420. if err != nil {
  421. return nil, err
  422. }
  423. defer fd.Close()
  424. buf := make([]byte, size)
  425. _, err = fd.ReadAt(buf, offset)
  426. if err != nil {
  427. return nil, err
  428. }
  429. return buf, nil
  430. }
  431. // ReplaceLocal replaces the local repository index with the given list of files.
  432. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  433. m.rmut.RLock()
  434. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  435. m.rmut.RUnlock()
  436. }
  437. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  438. m.rmut.RLock()
  439. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  440. m.rmut.RUnlock()
  441. return f
  442. }
  443. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  444. m.rmut.RLock()
  445. f := m.repoFiles[repo].GetGlobal(file)
  446. m.rmut.RUnlock()
  447. return f
  448. }
  449. type cFiler struct {
  450. m *Model
  451. r string
  452. }
  453. // Implements scanner.CurrentFiler
  454. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  455. return cf.m.CurrentRepoFile(cf.r, file)
  456. }
  457. // ConnectedTo returns true if we are connected to the named node.
  458. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  459. m.pmut.RLock()
  460. _, ok := m.protoConn[nodeID]
  461. m.pmut.RUnlock()
  462. return ok
  463. }
  464. // AddConnection adds a new peer connection to the model. An initial index will
  465. // be sent to the connected peer, thereafter index updates whenever the local
  466. // repository changes.
  467. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  468. nodeID := protoConn.ID()
  469. m.pmut.Lock()
  470. if _, ok := m.protoConn[nodeID]; ok {
  471. panic("add existing node")
  472. }
  473. m.protoConn[nodeID] = protoConn
  474. if _, ok := m.rawConn[nodeID]; ok {
  475. panic("add existing node")
  476. }
  477. m.rawConn[nodeID] = rawConn
  478. cm := m.clusterConfig(nodeID)
  479. protoConn.ClusterConfig(cm)
  480. m.rmut.RLock()
  481. for _, repo := range m.nodeRepos[nodeID] {
  482. fs := m.repoFiles[repo]
  483. go sendIndexes(protoConn, repo, fs)
  484. }
  485. m.rmut.RUnlock()
  486. m.pmut.Unlock()
  487. }
  488. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) {
  489. nodeID := conn.ID()
  490. name := conn.Name()
  491. var err error
  492. if debug {
  493. l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
  494. }
  495. defer func() {
  496. if debug {
  497. l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
  498. }
  499. }()
  500. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs)
  501. for err == nil {
  502. time.Sleep(5 * time.Second)
  503. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  504. continue
  505. }
  506. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs)
  507. }
  508. }
  509. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set) (uint64, error) {
  510. nodeID := conn.ID()
  511. name := conn.Name()
  512. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  513. maxLocalVer := uint64(0)
  514. var err error
  515. fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  516. if f.LocalVersion <= minLocalVer {
  517. return true
  518. }
  519. if f.LocalVersion > maxLocalVer {
  520. maxLocalVer = f.LocalVersion
  521. }
  522. if len(batch) == indexBatchSize {
  523. if initial {
  524. if err = conn.Index(repo, batch); err != nil {
  525. return false
  526. }
  527. if debug {
  528. l.Debugf("sendIndexes for %s-%s/%q: %d files (initial index)", nodeID, name, repo, len(batch))
  529. }
  530. initial = false
  531. } else {
  532. if err = conn.IndexUpdate(repo, batch); err != nil {
  533. return false
  534. }
  535. if debug {
  536. l.Debugf("sendIndexes for %s-%s/%q: %d files (batched update)", nodeID, name, repo, len(batch))
  537. }
  538. }
  539. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  540. }
  541. batch = append(batch, f)
  542. return true
  543. })
  544. if initial && err == nil {
  545. err = conn.Index(repo, batch)
  546. if debug && err == nil {
  547. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  548. }
  549. } else if len(batch) > 0 && err == nil {
  550. err = conn.IndexUpdate(repo, batch)
  551. if debug && err == nil {
  552. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  553. }
  554. }
  555. return maxLocalVer, err
  556. }
  557. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  558. f.LocalVersion = 0
  559. m.rmut.RLock()
  560. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  561. m.rmut.RUnlock()
  562. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  563. "repo": repo,
  564. "name": f.Name,
  565. "modified": time.Unix(f.Modified, 0),
  566. "flags": fmt.Sprintf("0%o", f.Flags),
  567. "size": f.Size(),
  568. })
  569. }
  570. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  571. m.pmut.RLock()
  572. nc, ok := m.protoConn[nodeID]
  573. m.pmut.RUnlock()
  574. if !ok {
  575. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  576. }
  577. if debug {
  578. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  579. }
  580. return nc.Request(repo, name, offset, size)
  581. }
  582. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  583. if m.started {
  584. panic("cannot add repo to started model")
  585. }
  586. if len(cfg.ID) == 0 {
  587. panic("cannot add empty repo id")
  588. }
  589. m.rmut.Lock()
  590. m.repoCfgs[cfg.ID] = cfg
  591. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  592. m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
  593. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  594. for i, node := range cfg.Nodes {
  595. m.repoNodes[cfg.ID][i] = node.NodeID
  596. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  597. }
  598. m.addedRepo = true
  599. m.rmut.Unlock()
  600. }
  601. func (m *Model) ScanRepos() {
  602. m.rmut.RLock()
  603. var repos = make([]string, 0, len(m.repoCfgs))
  604. for repo := range m.repoCfgs {
  605. repos = append(repos, repo)
  606. }
  607. m.rmut.RUnlock()
  608. var wg sync.WaitGroup
  609. wg.Add(len(repos))
  610. for _, repo := range repos {
  611. repo := repo
  612. go func() {
  613. err := m.ScanRepo(repo)
  614. if err != nil {
  615. invalidateRepo(m.cfg, repo, err)
  616. }
  617. wg.Done()
  618. }()
  619. }
  620. wg.Wait()
  621. }
  622. func (m *Model) CleanRepos() {
  623. m.rmut.RLock()
  624. var dirs = make([]string, 0, len(m.repoCfgs))
  625. for _, cfg := range m.repoCfgs {
  626. dirs = append(dirs, cfg.Directory)
  627. }
  628. m.rmut.RUnlock()
  629. var wg sync.WaitGroup
  630. wg.Add(len(dirs))
  631. for _, dir := range dirs {
  632. w := &scanner.Walker{
  633. Dir: dir,
  634. TempNamer: defTempNamer,
  635. }
  636. go func() {
  637. w.CleanTempFiles()
  638. wg.Done()
  639. }()
  640. }
  641. wg.Wait()
  642. }
  643. func (m *Model) ScanRepo(repo string) error {
  644. m.rmut.RLock()
  645. fs := m.repoFiles[repo]
  646. dir := m.repoCfgs[repo].Directory
  647. w := &scanner.Walker{
  648. Dir: dir,
  649. IgnoreFile: ".stignore",
  650. BlockSize: scanner.StandardBlockSize,
  651. TempNamer: defTempNamer,
  652. Suppressor: m.suppressor[repo],
  653. CurrentFiler: cFiler{m, repo},
  654. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  655. }
  656. m.rmut.RUnlock()
  657. m.setState(repo, RepoScanning)
  658. fchan, _, err := w.Walk()
  659. if err != nil {
  660. return err
  661. }
  662. batchSize := 100
  663. batch := make([]protocol.FileInfo, 0, 00)
  664. for f := range fchan {
  665. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  666. "repo": repo,
  667. "name": f.Name,
  668. "modified": time.Unix(f.Modified, 0),
  669. "flags": fmt.Sprintf("0%o", f.Flags),
  670. "size": f.Size(),
  671. })
  672. if len(batch) == batchSize {
  673. fs.Update(protocol.LocalNodeID, batch)
  674. batch = batch[:0]
  675. }
  676. batch = append(batch, f)
  677. }
  678. if len(batch) > 0 {
  679. fs.Update(protocol.LocalNodeID, batch)
  680. }
  681. batch = batch[:0]
  682. fs.WithHave(protocol.LocalNodeID, func(f protocol.FileInfo) bool {
  683. if !protocol.IsDeleted(f.Flags) {
  684. if len(batch) == batchSize {
  685. fs.Update(protocol.LocalNodeID, batch)
  686. batch = batch[:0]
  687. }
  688. if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  689. // File has been deleted
  690. f.Blocks = nil
  691. f.Flags |= protocol.FlagDeleted
  692. f.Version = lamport.Default.Tick(f.Version)
  693. f.LocalVersion = 0
  694. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  695. "repo": repo,
  696. "name": f.Name,
  697. "modified": time.Unix(f.Modified, 0),
  698. "flags": fmt.Sprintf("0%o", f.Flags),
  699. "size": f.Size(),
  700. })
  701. batch = append(batch, f)
  702. }
  703. }
  704. return true
  705. })
  706. if len(batch) > 0 {
  707. fs.Update(protocol.LocalNodeID, batch)
  708. }
  709. m.setState(repo, RepoIdle)
  710. return nil
  711. }
  712. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  713. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  714. cm := protocol.ClusterConfigMessage{
  715. ClientName: m.clientName,
  716. ClientVersion: m.clientVersion,
  717. }
  718. m.rmut.RLock()
  719. for _, repo := range m.nodeRepos[node] {
  720. cr := protocol.Repository{
  721. ID: repo,
  722. }
  723. for _, node := range m.repoNodes[repo] {
  724. // TODO: Set read only bit when relevant
  725. cr.Nodes = append(cr.Nodes, protocol.Node{
  726. ID: node[:],
  727. Flags: protocol.FlagShareTrusted,
  728. })
  729. }
  730. cm.Repositories = append(cm.Repositories, cr)
  731. }
  732. m.rmut.RUnlock()
  733. return cm
  734. }
  735. func (m *Model) setState(repo string, state repoState) {
  736. m.smut.Lock()
  737. oldState := m.repoState[repo]
  738. changed, ok := m.repoStateChanged[repo]
  739. if state != oldState {
  740. m.repoState[repo] = state
  741. m.repoStateChanged[repo] = time.Now()
  742. eventData := map[string]interface{}{
  743. "repo": repo,
  744. "to": state.String(),
  745. }
  746. if ok {
  747. eventData["duration"] = time.Since(changed).Seconds()
  748. eventData["from"] = oldState.String()
  749. }
  750. events.Default.Log(events.StateChanged, eventData)
  751. }
  752. m.smut.Unlock()
  753. }
  754. func (m *Model) State(repo string) (string, time.Time) {
  755. m.smut.RLock()
  756. state := m.repoState[repo]
  757. changed := m.repoStateChanged[repo]
  758. m.smut.RUnlock()
  759. return state.String(), changed
  760. }
  761. func (m *Model) Override(repo string) {
  762. m.rmut.RLock()
  763. fs := m.repoFiles[repo]
  764. m.rmut.RUnlock()
  765. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  766. fs.WithNeed(protocol.LocalNodeID, func(need protocol.FileInfo) bool {
  767. if len(batch) == indexBatchSize {
  768. fs.Update(protocol.LocalNodeID, batch)
  769. batch = batch[:0]
  770. }
  771. have := fs.Get(protocol.LocalNodeID, need.Name)
  772. if have.Name != need.Name {
  773. // We are missing the file
  774. need.Flags |= protocol.FlagDeleted
  775. need.Blocks = nil
  776. } else {
  777. // We have the file, replace with our version
  778. need = have
  779. }
  780. need.Version = lamport.Default.Tick(need.Version)
  781. need.LocalVersion = 0
  782. batch = append(batch, need)
  783. return true
  784. })
  785. if len(batch) > 0 {
  786. fs.Update(protocol.LocalNodeID, batch)
  787. }
  788. }
  789. // Version returns the change version for the given repository. This is
  790. // guaranteed to increment if the contents of the local or global repository
  791. // has changed.
  792. func (m *Model) LocalVersion(repo string) uint64 {
  793. m.rmut.Lock()
  794. defer m.rmut.Unlock()
  795. fs, ok := m.repoFiles[repo]
  796. if !ok {
  797. return 0
  798. }
  799. ver := fs.LocalVersion(protocol.LocalNodeID)
  800. for _, n := range m.repoNodes[repo] {
  801. ver += fs.LocalVersion(n)
  802. }
  803. return ver
  804. }