model.go 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/syncthing/syncthing/config"
  17. "github.com/syncthing/syncthing/events"
  18. "github.com/syncthing/syncthing/files"
  19. "github.com/syncthing/syncthing/lamport"
  20. "github.com/syncthing/syncthing/protocol"
  21. "github.com/syncthing/syncthing/scanner"
  22. "github.com/syncthing/syncthing/stats"
  23. "github.com/syndtr/goleveldb/leveldb"
  24. )
  25. type repoState int
  26. const (
  27. RepoIdle repoState = iota
  28. RepoScanning
  29. RepoSyncing
  30. RepoCleaning
  31. )
  32. func (s repoState) String() string {
  33. switch s {
  34. case RepoIdle:
  35. return "idle"
  36. case RepoScanning:
  37. return "scanning"
  38. case RepoCleaning:
  39. return "cleaning"
  40. case RepoSyncing:
  41. return "syncing"
  42. default:
  43. return "unknown"
  44. }
  45. }
  46. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  47. // of an unsynchronized directory entry or a deleted file. We need it to be
  48. // larger than zero so that it's visible that there is some amount of bytes to
  49. // transfer to bring the systems into synchronization.
  50. const zeroEntrySize = 128
  51. // How many files to send in each Index/IndexUpdate message.
  52. const (
  53. indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
  54. indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
  55. IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
  56. indexBatchSize = 1000 // Either way, don't include more files than this
  57. )
  58. type Model struct {
  59. indexDir string
  60. cfg *config.Configuration
  61. db *leveldb.DB
  62. nodeName string
  63. clientName string
  64. clientVersion string
  65. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  66. repoFiles map[string]*files.Set // repo -> files
  67. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  68. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  69. nodeStatRefs map[protocol.NodeID]*stats.NodeStatisticsReference // nodeID -> statsRef
  70. rmut sync.RWMutex // protects the above
  71. repoState map[string]repoState // repo -> state
  72. repoStateChanged map[string]time.Time // repo -> time when state changed
  73. smut sync.RWMutex
  74. protoConn map[protocol.NodeID]protocol.Connection
  75. rawConn map[protocol.NodeID]io.Closer
  76. nodeVer map[protocol.NodeID]string
  77. pmut sync.RWMutex // protects protoConn and rawConn
  78. sentLocalVer map[protocol.NodeID]map[string]uint64
  79. slMut sync.Mutex
  80. addedRepo bool
  81. started bool
  82. }
  83. var (
  84. ErrNoSuchFile = errors.New("no such file")
  85. ErrInvalid = errors.New("file is invalid")
  86. )
  87. // NewModel creates and starts a new model. The model starts in read-only mode,
  88. // where it sends index information to connected peers and responds to requests
  89. // for file data without altering the local repository in any way.
  90. func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, clientVersion string, db *leveldb.DB) *Model {
  91. m := &Model{
  92. indexDir: indexDir,
  93. cfg: cfg,
  94. db: db,
  95. nodeName: nodeName,
  96. clientName: clientName,
  97. clientVersion: clientVersion,
  98. repoCfgs: make(map[string]config.RepositoryConfiguration),
  99. repoFiles: make(map[string]*files.Set),
  100. repoNodes: make(map[string][]protocol.NodeID),
  101. nodeRepos: make(map[protocol.NodeID][]string),
  102. nodeStatRefs: make(map[protocol.NodeID]*stats.NodeStatisticsReference),
  103. repoState: make(map[string]repoState),
  104. repoStateChanged: make(map[string]time.Time),
  105. protoConn: make(map[protocol.NodeID]protocol.Connection),
  106. rawConn: make(map[protocol.NodeID]io.Closer),
  107. nodeVer: make(map[protocol.NodeID]string),
  108. sentLocalVer: make(map[protocol.NodeID]map[string]uint64),
  109. }
  110. for _, node := range cfg.Nodes {
  111. m.nodeStatRefs[node.NodeID] = stats.NewNodeStatisticsReference(db, node.NodeID)
  112. }
  113. var timeout = 20 * 60 // seconds
  114. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  115. it, err := strconv.Atoi(t)
  116. if err == nil {
  117. timeout = it
  118. }
  119. }
  120. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  121. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  122. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  123. return m
  124. }
  125. // StartRW starts read/write processing on the current model. When in
  126. // read/write mode the model will attempt to keep in sync with the cluster by
  127. // pulling needed files from peer nodes.
  128. func (m *Model) StartRepoRW(repo string, threads int) {
  129. m.rmut.RLock()
  130. defer m.rmut.RUnlock()
  131. if cfg, ok := m.repoCfgs[repo]; !ok {
  132. panic("cannot start without repo")
  133. } else {
  134. newPuller(cfg, m, threads, m.cfg)
  135. }
  136. }
  137. // StartRO starts read only processing on the current model. When in
  138. // read only mode the model will announce files to the cluster but not
  139. // pull in any external changes.
  140. func (m *Model) StartRepoRO(repo string) {
  141. m.StartRepoRW(repo, 0) // zero threads => read only
  142. }
  143. type ConnectionInfo struct {
  144. protocol.Statistics
  145. Address string
  146. ClientVersion string
  147. }
  148. // ConnectionStats returns a map with connection statistics for each connected node.
  149. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  150. type remoteAddrer interface {
  151. RemoteAddr() net.Addr
  152. }
  153. m.pmut.RLock()
  154. m.rmut.RLock()
  155. var res = make(map[string]ConnectionInfo)
  156. for node, conn := range m.protoConn {
  157. ci := ConnectionInfo{
  158. Statistics: conn.Statistics(),
  159. ClientVersion: m.nodeVer[node],
  160. }
  161. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  162. ci.Address = nc.RemoteAddr().String()
  163. }
  164. res[node.String()] = ci
  165. }
  166. m.rmut.RUnlock()
  167. m.pmut.RUnlock()
  168. in, out := protocol.TotalInOut()
  169. res["total"] = ConnectionInfo{
  170. Statistics: protocol.Statistics{
  171. At: time.Now(),
  172. InBytesTotal: in,
  173. OutBytesTotal: out,
  174. },
  175. }
  176. return res
  177. }
  178. // Returns statistics about each node
  179. func (m *Model) NodeStatistics() map[string]stats.NodeStatistics {
  180. var res = make(map[string]stats.NodeStatistics)
  181. m.rmut.RLock()
  182. for _, node := range m.cfg.Nodes {
  183. res[node.NodeID.String()] = m.nodeStatRefs[node.NodeID].GetStatistics()
  184. }
  185. m.rmut.RUnlock()
  186. return res
  187. }
  188. // Returns the completion status, in percent, for the given node and repo.
  189. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  190. var tot int64
  191. m.rmut.RLock()
  192. rf, ok := m.repoFiles[repo]
  193. m.rmut.RUnlock()
  194. if !ok {
  195. return 0 // Repo doesn't exist, so we hardly have any of it
  196. }
  197. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  198. if !f.IsDeleted() {
  199. tot += f.Size()
  200. }
  201. return true
  202. })
  203. if tot == 0 {
  204. return 100 // Repo is empty, so we have all of it
  205. }
  206. var need int64
  207. rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
  208. if !f.IsDeleted() {
  209. need += f.Size()
  210. }
  211. return true
  212. })
  213. res := 100 * (1 - float64(need)/float64(tot))
  214. if debug {
  215. l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
  216. }
  217. return res
  218. }
  219. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  220. for _, f := range fs {
  221. fs, de, by := sizeOfFile(f)
  222. files += fs
  223. deleted += de
  224. bytes += by
  225. }
  226. return
  227. }
  228. func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
  229. if !f.IsDeleted() {
  230. files++
  231. } else {
  232. deleted++
  233. }
  234. bytes += f.Size()
  235. return
  236. }
  237. // GlobalSize returns the number of files, deleted files and total bytes for all
  238. // files in the global model.
  239. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  240. m.rmut.RLock()
  241. defer m.rmut.RUnlock()
  242. if rf, ok := m.repoFiles[repo]; ok {
  243. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  244. fs, de, by := sizeOfFile(f)
  245. files += fs
  246. deleted += de
  247. bytes += by
  248. return true
  249. })
  250. }
  251. return
  252. }
  253. // LocalSize returns the number of files, deleted files and total bytes for all
  254. // files in the local repository.
  255. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  256. m.rmut.RLock()
  257. defer m.rmut.RUnlock()
  258. if rf, ok := m.repoFiles[repo]; ok {
  259. rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  260. fs, de, by := sizeOfFile(f)
  261. files += fs
  262. deleted += de
  263. bytes += by
  264. return true
  265. })
  266. }
  267. return
  268. }
  269. // NeedSize returns the number and total size of currently needed files.
  270. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  271. m.rmut.RLock()
  272. defer m.rmut.RUnlock()
  273. if rf, ok := m.repoFiles[repo]; ok {
  274. rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  275. fs, de, by := sizeOfFile(f)
  276. files += fs + de
  277. bytes += by
  278. return true
  279. })
  280. }
  281. if debug {
  282. l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
  283. }
  284. return
  285. }
  286. // NeedFiles returns the list of currently needed files
  287. func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
  288. m.rmut.RLock()
  289. defer m.rmut.RUnlock()
  290. if rf, ok := m.repoFiles[repo]; ok {
  291. fs := make([]protocol.FileInfo, 0, indexBatchSize)
  292. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  293. fs = append(fs, f.(protocol.FileInfo))
  294. return len(fs) < indexBatchSize
  295. })
  296. return fs
  297. }
  298. return nil
  299. }
  300. // Index is called when a new node is connected and we receive their full index.
  301. // Implements the protocol.Model interface.
  302. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  303. if debug {
  304. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  305. }
  306. if !m.repoSharedWith(repo, nodeID) {
  307. events.Default.Log(events.RepoRejected, map[string]string{
  308. "repo": repo,
  309. "node": nodeID.String(),
  310. })
  311. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  312. return
  313. }
  314. for i := range fs {
  315. lamport.Default.Tick(fs[i].Version)
  316. }
  317. m.rmut.RLock()
  318. r, ok := m.repoFiles[repo]
  319. m.rmut.RUnlock()
  320. if ok {
  321. r.Replace(nodeID, fs)
  322. } else {
  323. l.Fatalf("Index for nonexistant repo %q", repo)
  324. }
  325. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  326. "node": nodeID.String(),
  327. "repo": repo,
  328. "items": len(fs),
  329. "version": r.LocalVersion(nodeID),
  330. })
  331. }
  332. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  333. // Implements the protocol.Model interface.
  334. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  335. if debug {
  336. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  337. }
  338. if !m.repoSharedWith(repo, nodeID) {
  339. l.Infof("Update for unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  340. return
  341. }
  342. for i := range fs {
  343. lamport.Default.Tick(fs[i].Version)
  344. }
  345. m.rmut.RLock()
  346. r, ok := m.repoFiles[repo]
  347. m.rmut.RUnlock()
  348. if ok {
  349. r.Update(nodeID, fs)
  350. } else {
  351. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  352. }
  353. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  354. "node": nodeID.String(),
  355. "repo": repo,
  356. "items": len(fs),
  357. "version": r.LocalVersion(nodeID),
  358. })
  359. }
  360. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  361. m.rmut.RLock()
  362. defer m.rmut.RUnlock()
  363. for _, nrepo := range m.nodeRepos[nodeID] {
  364. if nrepo == repo {
  365. return true
  366. }
  367. }
  368. return false
  369. }
  370. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  371. m.pmut.Lock()
  372. if config.ClientName == "syncthing" {
  373. m.nodeVer[nodeID] = config.ClientVersion
  374. } else {
  375. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  376. }
  377. m.pmut.Unlock()
  378. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  379. if name := config.GetOption("name"); name != "" {
  380. l.Infof("Node %s hostname is %q", nodeID, name)
  381. node := m.cfg.GetNodeConfiguration(nodeID)
  382. if node != nil && node.Name == "" {
  383. node.Name = name
  384. }
  385. }
  386. }
  387. // Close removes the peer from the model and closes the underlying connection if possible.
  388. // Implements the protocol.Model interface.
  389. func (m *Model) Close(node protocol.NodeID, err error) {
  390. l.Infof("Connection to %s closed: %v", node, err)
  391. events.Default.Log(events.NodeDisconnected, map[string]string{
  392. "id": node.String(),
  393. "error": err.Error(),
  394. })
  395. m.pmut.Lock()
  396. m.rmut.RLock()
  397. for _, repo := range m.nodeRepos[node] {
  398. m.repoFiles[repo].Replace(node, nil)
  399. }
  400. m.rmut.RUnlock()
  401. conn, ok := m.rawConn[node]
  402. if ok {
  403. conn.Close()
  404. }
  405. delete(m.protoConn, node)
  406. delete(m.rawConn, node)
  407. delete(m.nodeVer, node)
  408. m.pmut.Unlock()
  409. }
  410. // Request returns the specified data segment by reading it from local disk.
  411. // Implements the protocol.Model interface.
  412. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  413. // Verify that the requested file exists in the local model.
  414. m.rmut.RLock()
  415. r, ok := m.repoFiles[repo]
  416. m.rmut.RUnlock()
  417. if !ok {
  418. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  419. return nil, ErrNoSuchFile
  420. }
  421. lf := r.Get(protocol.LocalNodeID, name)
  422. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  423. if debug {
  424. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  425. }
  426. return nil, ErrInvalid
  427. }
  428. if offset > lf.Size() {
  429. if debug {
  430. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  431. }
  432. return nil, ErrNoSuchFile
  433. }
  434. if debug && nodeID != protocol.LocalNodeID {
  435. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  436. }
  437. m.rmut.RLock()
  438. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  439. m.rmut.RUnlock()
  440. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  441. if err != nil {
  442. return nil, err
  443. }
  444. defer fd.Close()
  445. buf := make([]byte, size)
  446. _, err = fd.ReadAt(buf, offset)
  447. if err != nil {
  448. return nil, err
  449. }
  450. return buf, nil
  451. }
  452. // ReplaceLocal replaces the local repository index with the given list of files.
  453. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  454. m.rmut.RLock()
  455. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  456. m.rmut.RUnlock()
  457. }
  458. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  459. m.rmut.RLock()
  460. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  461. m.rmut.RUnlock()
  462. return f
  463. }
  464. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  465. m.rmut.RLock()
  466. f := m.repoFiles[repo].GetGlobal(file)
  467. m.rmut.RUnlock()
  468. return f
  469. }
  470. type cFiler struct {
  471. m *Model
  472. r string
  473. }
  474. // Implements scanner.CurrentFiler
  475. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  476. return cf.m.CurrentRepoFile(cf.r, file)
  477. }
  478. // ConnectedTo returns true if we are connected to the named node.
  479. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  480. m.pmut.RLock()
  481. _, ok := m.protoConn[nodeID]
  482. if ok {
  483. m.nodeStatRefs[nodeID].WasSeen()
  484. }
  485. m.pmut.RUnlock()
  486. return ok
  487. }
  488. // AddConnection adds a new peer connection to the model. An initial index will
  489. // be sent to the connected peer, thereafter index updates whenever the local
  490. // repository changes.
  491. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  492. nodeID := protoConn.ID()
  493. m.pmut.Lock()
  494. if _, ok := m.protoConn[nodeID]; ok {
  495. panic("add existing node")
  496. }
  497. m.protoConn[nodeID] = protoConn
  498. if _, ok := m.rawConn[nodeID]; ok {
  499. panic("add existing node")
  500. }
  501. m.rawConn[nodeID] = rawConn
  502. cm := m.clusterConfig(nodeID)
  503. protoConn.ClusterConfig(cm)
  504. m.rmut.RLock()
  505. for _, repo := range m.nodeRepos[nodeID] {
  506. fs := m.repoFiles[repo]
  507. go sendIndexes(protoConn, repo, fs)
  508. }
  509. if statRef, ok := m.nodeStatRefs[nodeID]; ok {
  510. statRef.WasSeen()
  511. } else {
  512. l.Warnf("AddConnection for unconfigured node %v?", nodeID)
  513. }
  514. m.rmut.RUnlock()
  515. m.pmut.Unlock()
  516. }
  517. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) {
  518. nodeID := conn.ID()
  519. name := conn.Name()
  520. var err error
  521. if debug {
  522. l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
  523. }
  524. defer func() {
  525. if debug {
  526. l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
  527. }
  528. }()
  529. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs)
  530. for err == nil {
  531. time.Sleep(5 * time.Second)
  532. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  533. continue
  534. }
  535. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs)
  536. }
  537. }
  538. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set) (uint64, error) {
  539. nodeID := conn.ID()
  540. name := conn.Name()
  541. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  542. currentBatchSize := 0
  543. maxLocalVer := uint64(0)
  544. var err error
  545. fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  546. f := fi.(protocol.FileInfo)
  547. if f.LocalVersion <= minLocalVer {
  548. return true
  549. }
  550. if f.LocalVersion > maxLocalVer {
  551. maxLocalVer = f.LocalVersion
  552. }
  553. if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
  554. if initial {
  555. if err = conn.Index(repo, batch); err != nil {
  556. return false
  557. }
  558. if debug {
  559. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", nodeID, name, repo, len(batch), currentBatchSize)
  560. }
  561. initial = false
  562. } else {
  563. if err = conn.IndexUpdate(repo, batch); err != nil {
  564. return false
  565. }
  566. if debug {
  567. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", nodeID, name, repo, len(batch), currentBatchSize)
  568. }
  569. }
  570. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  571. currentBatchSize = 0
  572. }
  573. batch = append(batch, f)
  574. currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
  575. return true
  576. })
  577. if initial && err == nil {
  578. err = conn.Index(repo, batch)
  579. if debug && err == nil {
  580. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  581. }
  582. } else if len(batch) > 0 && err == nil {
  583. err = conn.IndexUpdate(repo, batch)
  584. if debug && err == nil {
  585. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  586. }
  587. }
  588. return maxLocalVer, err
  589. }
  590. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  591. f.LocalVersion = 0
  592. m.rmut.RLock()
  593. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  594. m.rmut.RUnlock()
  595. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  596. "repo": repo,
  597. "name": f.Name,
  598. "modified": time.Unix(f.Modified, 0),
  599. "flags": fmt.Sprintf("0%o", f.Flags),
  600. "size": f.Size(),
  601. })
  602. }
  603. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  604. m.pmut.RLock()
  605. nc, ok := m.protoConn[nodeID]
  606. m.pmut.RUnlock()
  607. if !ok {
  608. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  609. }
  610. if debug {
  611. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  612. }
  613. return nc.Request(repo, name, offset, size)
  614. }
  615. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  616. if m.started {
  617. panic("cannot add repo to started model")
  618. }
  619. if len(cfg.ID) == 0 {
  620. panic("cannot add empty repo id")
  621. }
  622. m.rmut.Lock()
  623. m.repoCfgs[cfg.ID] = cfg
  624. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  625. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  626. for i, node := range cfg.Nodes {
  627. m.repoNodes[cfg.ID][i] = node.NodeID
  628. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  629. }
  630. m.addedRepo = true
  631. m.rmut.Unlock()
  632. }
  633. func (m *Model) ScanRepos() {
  634. m.rmut.RLock()
  635. var repos = make([]string, 0, len(m.repoCfgs))
  636. for repo := range m.repoCfgs {
  637. repos = append(repos, repo)
  638. }
  639. m.rmut.RUnlock()
  640. var wg sync.WaitGroup
  641. wg.Add(len(repos))
  642. for _, repo := range repos {
  643. repo := repo
  644. go func() {
  645. err := m.ScanRepo(repo)
  646. if err != nil {
  647. invalidateRepo(m.cfg, repo, err)
  648. }
  649. wg.Done()
  650. }()
  651. }
  652. wg.Wait()
  653. }
  654. func (m *Model) CleanRepos() {
  655. m.rmut.RLock()
  656. var dirs = make([]string, 0, len(m.repoCfgs))
  657. for _, cfg := range m.repoCfgs {
  658. dirs = append(dirs, cfg.Directory)
  659. }
  660. m.rmut.RUnlock()
  661. var wg sync.WaitGroup
  662. wg.Add(len(dirs))
  663. for _, dir := range dirs {
  664. w := &scanner.Walker{
  665. Dir: dir,
  666. TempNamer: defTempNamer,
  667. }
  668. go func() {
  669. w.CleanTempFiles()
  670. wg.Done()
  671. }()
  672. }
  673. wg.Wait()
  674. }
  675. func (m *Model) ScanRepo(repo string) error {
  676. return m.ScanRepoSub(repo, "")
  677. }
  678. func (m *Model) ScanRepoSub(repo, sub string) error {
  679. if p := filepath.Clean(filepath.Join(repo, sub)); !strings.HasPrefix(p, repo) {
  680. return errors.New("invalid subpath")
  681. }
  682. m.rmut.RLock()
  683. fs, ok := m.repoFiles[repo]
  684. dir := m.repoCfgs[repo].Directory
  685. w := &scanner.Walker{
  686. Dir: dir,
  687. Sub: sub,
  688. IgnoreFile: ".stignore",
  689. BlockSize: scanner.StandardBlockSize,
  690. TempNamer: defTempNamer,
  691. CurrentFiler: cFiler{m, repo},
  692. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  693. }
  694. m.rmut.RUnlock()
  695. if !ok {
  696. return errors.New("no such repo")
  697. }
  698. m.setState(repo, RepoScanning)
  699. fchan, err := w.Walk()
  700. if err != nil {
  701. return err
  702. }
  703. batchSize := 100
  704. batch := make([]protocol.FileInfo, 0, 00)
  705. for f := range fchan {
  706. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  707. "repo": repo,
  708. "name": f.Name,
  709. "modified": time.Unix(f.Modified, 0),
  710. "flags": fmt.Sprintf("0%o", f.Flags),
  711. "size": f.Size(),
  712. })
  713. if len(batch) == batchSize {
  714. fs.Update(protocol.LocalNodeID, batch)
  715. batch = batch[:0]
  716. }
  717. batch = append(batch, f)
  718. }
  719. if len(batch) > 0 {
  720. fs.Update(protocol.LocalNodeID, batch)
  721. }
  722. batch = batch[:0]
  723. // TODO: We should limit the Have scanning to start at sub
  724. seenPrefix := false
  725. fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  726. f := fi.(protocol.FileInfoTruncated)
  727. if !strings.HasPrefix(f.Name, sub) {
  728. return !seenPrefix
  729. }
  730. seenPrefix = true
  731. if !protocol.IsDeleted(f.Flags) {
  732. if len(batch) == batchSize {
  733. fs.Update(protocol.LocalNodeID, batch)
  734. batch = batch[:0]
  735. }
  736. if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  737. // File has been deleted
  738. nf := protocol.FileInfo{
  739. Name: f.Name,
  740. Flags: f.Flags | protocol.FlagDeleted,
  741. Modified: f.Modified,
  742. Version: lamport.Default.Tick(f.Version),
  743. }
  744. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  745. "repo": repo,
  746. "name": f.Name,
  747. "modified": time.Unix(f.Modified, 0),
  748. "flags": fmt.Sprintf("0%o", f.Flags),
  749. "size": f.Size(),
  750. })
  751. batch = append(batch, nf)
  752. }
  753. }
  754. return true
  755. })
  756. if len(batch) > 0 {
  757. fs.Update(protocol.LocalNodeID, batch)
  758. }
  759. m.setState(repo, RepoIdle)
  760. return nil
  761. }
  762. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  763. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  764. cm := protocol.ClusterConfigMessage{
  765. ClientName: m.clientName,
  766. ClientVersion: m.clientVersion,
  767. Options: []protocol.Option{
  768. {
  769. Key: "name",
  770. Value: m.nodeName,
  771. },
  772. },
  773. }
  774. m.rmut.RLock()
  775. for _, repo := range m.nodeRepos[node] {
  776. cr := protocol.Repository{
  777. ID: repo,
  778. }
  779. for _, node := range m.repoNodes[repo] {
  780. // TODO: Set read only bit when relevant
  781. cr.Nodes = append(cr.Nodes, protocol.Node{
  782. ID: node[:],
  783. Flags: protocol.FlagShareTrusted,
  784. })
  785. }
  786. cm.Repositories = append(cm.Repositories, cr)
  787. }
  788. m.rmut.RUnlock()
  789. return cm
  790. }
  791. func (m *Model) setState(repo string, state repoState) {
  792. m.smut.Lock()
  793. oldState := m.repoState[repo]
  794. changed, ok := m.repoStateChanged[repo]
  795. if state != oldState {
  796. m.repoState[repo] = state
  797. m.repoStateChanged[repo] = time.Now()
  798. eventData := map[string]interface{}{
  799. "repo": repo,
  800. "to": state.String(),
  801. }
  802. if ok {
  803. eventData["duration"] = time.Since(changed).Seconds()
  804. eventData["from"] = oldState.String()
  805. }
  806. events.Default.Log(events.StateChanged, eventData)
  807. }
  808. m.smut.Unlock()
  809. }
  810. func (m *Model) State(repo string) (string, time.Time) {
  811. m.smut.RLock()
  812. state := m.repoState[repo]
  813. changed := m.repoStateChanged[repo]
  814. m.smut.RUnlock()
  815. return state.String(), changed
  816. }
  817. func (m *Model) Override(repo string) {
  818. m.rmut.RLock()
  819. fs := m.repoFiles[repo]
  820. m.rmut.RUnlock()
  821. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  822. fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  823. need := fi.(protocol.FileInfo)
  824. if len(batch) == indexBatchSize {
  825. fs.Update(protocol.LocalNodeID, batch)
  826. batch = batch[:0]
  827. }
  828. have := fs.Get(protocol.LocalNodeID, need.Name)
  829. if have.Name != need.Name {
  830. // We are missing the file
  831. need.Flags |= protocol.FlagDeleted
  832. need.Blocks = nil
  833. } else {
  834. // We have the file, replace with our version
  835. need = have
  836. }
  837. need.Version = lamport.Default.Tick(need.Version)
  838. need.LocalVersion = 0
  839. batch = append(batch, need)
  840. return true
  841. })
  842. if len(batch) > 0 {
  843. fs.Update(protocol.LocalNodeID, batch)
  844. }
  845. }
  846. // Version returns the change version for the given repository. This is
  847. // guaranteed to increment if the contents of the local or global repository
  848. // has changed.
  849. func (m *Model) LocalVersion(repo string) uint64 {
  850. m.rmut.Lock()
  851. defer m.rmut.Unlock()
  852. fs, ok := m.repoFiles[repo]
  853. if !ok {
  854. return 0
  855. }
  856. ver := fs.LocalVersion(protocol.LocalNodeID)
  857. for _, n := range m.repoNodes[repo] {
  858. ver += fs.LocalVersion(n)
  859. }
  860. return ver
  861. }