model.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "crypto/tls"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net"
  11. "os"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/syncthing/syncthing/config"
  18. "github.com/syncthing/syncthing/events"
  19. "github.com/syncthing/syncthing/files"
  20. "github.com/syncthing/syncthing/ignore"
  21. "github.com/syncthing/syncthing/lamport"
  22. "github.com/syncthing/syncthing/protocol"
  23. "github.com/syncthing/syncthing/scanner"
  24. "github.com/syncthing/syncthing/stats"
  25. "github.com/syndtr/goleveldb/leveldb"
  26. )
  27. type repoState int
  28. const (
  29. RepoIdle repoState = iota
  30. RepoScanning
  31. RepoSyncing
  32. RepoCleaning
  33. )
  34. func (s repoState) String() string {
  35. switch s {
  36. case RepoIdle:
  37. return "idle"
  38. case RepoScanning:
  39. return "scanning"
  40. case RepoCleaning:
  41. return "cleaning"
  42. case RepoSyncing:
  43. return "syncing"
  44. default:
  45. return "unknown"
  46. }
  47. }
  48. // How many files to send in each Index/IndexUpdate message.
  49. const (
  50. indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
  51. indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
  52. IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
  53. indexBatchSize = 1000 // Either way, don't include more files than this
  54. )
  55. type Model struct {
  56. indexDir string
  57. cfg *config.Configuration
  58. db *leveldb.DB
  59. nodeName string
  60. clientName string
  61. clientVersion string
  62. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  63. repoFiles map[string]*files.Set // repo -> files
  64. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  65. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  66. nodeStatRefs map[protocol.NodeID]*stats.NodeStatisticsReference // nodeID -> statsRef
  67. repoIgnores map[string]ignore.Patterns // repo -> list of ignore patterns
  68. rmut sync.RWMutex // protects the above
  69. repoState map[string]repoState // repo -> state
  70. repoStateChanged map[string]time.Time // repo -> time when state changed
  71. smut sync.RWMutex
  72. protoConn map[protocol.NodeID]protocol.Connection
  73. rawConn map[protocol.NodeID]io.Closer
  74. nodeVer map[protocol.NodeID]string
  75. pmut sync.RWMutex // protects protoConn and rawConn
  76. addedRepo bool
  77. started bool
  78. }
  79. var (
  80. ErrNoSuchFile = errors.New("no such file")
  81. ErrInvalid = errors.New("file is invalid")
  82. )
  83. // NewModel creates and starts a new model. The model starts in read-only mode,
  84. // where it sends index information to connected peers and responds to requests
  85. // for file data without altering the local repository in any way.
  86. func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, clientVersion string, db *leveldb.DB) *Model {
  87. m := &Model{
  88. indexDir: indexDir,
  89. cfg: cfg,
  90. db: db,
  91. nodeName: nodeName,
  92. clientName: clientName,
  93. clientVersion: clientVersion,
  94. repoCfgs: make(map[string]config.RepositoryConfiguration),
  95. repoFiles: make(map[string]*files.Set),
  96. repoNodes: make(map[string][]protocol.NodeID),
  97. nodeRepos: make(map[protocol.NodeID][]string),
  98. nodeStatRefs: make(map[protocol.NodeID]*stats.NodeStatisticsReference),
  99. repoIgnores: make(map[string]ignore.Patterns),
  100. repoState: make(map[string]repoState),
  101. repoStateChanged: make(map[string]time.Time),
  102. protoConn: make(map[protocol.NodeID]protocol.Connection),
  103. rawConn: make(map[protocol.NodeID]io.Closer),
  104. nodeVer: make(map[protocol.NodeID]string),
  105. }
  106. var timeout = 20 * 60 // seconds
  107. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  108. it, err := strconv.Atoi(t)
  109. if err == nil {
  110. timeout = it
  111. }
  112. }
  113. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  114. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  115. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  116. return m
  117. }
  118. // StartRW starts read/write processing on the current model. When in
  119. // read/write mode the model will attempt to keep in sync with the cluster by
  120. // pulling needed files from peer nodes.
  121. func (m *Model) StartRepoRW(repo string, threads int) {
  122. m.rmut.RLock()
  123. defer m.rmut.RUnlock()
  124. if cfg, ok := m.repoCfgs[repo]; !ok {
  125. panic("cannot start without repo")
  126. } else {
  127. newPuller(cfg, m, threads, m.cfg)
  128. }
  129. }
  130. // StartRO starts read only processing on the current model. When in
  131. // read only mode the model will announce files to the cluster but not
  132. // pull in any external changes.
  133. func (m *Model) StartRepoRO(repo string) {
  134. m.StartRepoRW(repo, 0) // zero threads => read only
  135. }
  136. type ConnectionInfo struct {
  137. protocol.Statistics
  138. Address string
  139. ClientVersion string
  140. }
  141. // ConnectionStats returns a map with connection statistics for each connected node.
  142. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  143. type remoteAddrer interface {
  144. RemoteAddr() net.Addr
  145. }
  146. m.pmut.RLock()
  147. m.rmut.RLock()
  148. var res = make(map[string]ConnectionInfo)
  149. for node, conn := range m.protoConn {
  150. ci := ConnectionInfo{
  151. Statistics: conn.Statistics(),
  152. ClientVersion: m.nodeVer[node],
  153. }
  154. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  155. ci.Address = nc.RemoteAddr().String()
  156. }
  157. res[node.String()] = ci
  158. }
  159. m.rmut.RUnlock()
  160. m.pmut.RUnlock()
  161. in, out := protocol.TotalInOut()
  162. res["total"] = ConnectionInfo{
  163. Statistics: protocol.Statistics{
  164. At: time.Now(),
  165. InBytesTotal: in,
  166. OutBytesTotal: out,
  167. },
  168. }
  169. return res
  170. }
  171. // Returns statistics about each node
  172. func (m *Model) NodeStatistics() map[string]stats.NodeStatistics {
  173. var res = make(map[string]stats.NodeStatistics)
  174. for _, node := range m.cfg.Nodes {
  175. res[node.NodeID.String()] = m.nodeStatRef(node.NodeID).GetStatistics()
  176. }
  177. return res
  178. }
  179. // Returns the completion status, in percent, for the given node and repo.
  180. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  181. var tot int64
  182. m.rmut.RLock()
  183. rf, ok := m.repoFiles[repo]
  184. m.rmut.RUnlock()
  185. if !ok {
  186. return 0 // Repo doesn't exist, so we hardly have any of it
  187. }
  188. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  189. if !f.IsDeleted() {
  190. tot += f.Size()
  191. }
  192. return true
  193. })
  194. if tot == 0 {
  195. return 100 // Repo is empty, so we have all of it
  196. }
  197. var need int64
  198. rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
  199. if !f.IsDeleted() {
  200. need += f.Size()
  201. }
  202. return true
  203. })
  204. res := 100 * (1 - float64(need)/float64(tot))
  205. if debug {
  206. l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
  207. }
  208. return res
  209. }
  210. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  211. for _, f := range fs {
  212. fs, de, by := sizeOfFile(f)
  213. files += fs
  214. deleted += de
  215. bytes += by
  216. }
  217. return
  218. }
  219. func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
  220. if !f.IsDeleted() {
  221. files++
  222. } else {
  223. deleted++
  224. }
  225. bytes += f.Size()
  226. return
  227. }
  228. // GlobalSize returns the number of files, deleted files and total bytes for all
  229. // files in the global model.
  230. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  231. m.rmut.RLock()
  232. defer m.rmut.RUnlock()
  233. if rf, ok := m.repoFiles[repo]; ok {
  234. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  235. fs, de, by := sizeOfFile(f)
  236. files += fs
  237. deleted += de
  238. bytes += by
  239. return true
  240. })
  241. }
  242. return
  243. }
  244. // LocalSize returns the number of files, deleted files and total bytes for all
  245. // files in the local repository.
  246. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  247. m.rmut.RLock()
  248. defer m.rmut.RUnlock()
  249. if rf, ok := m.repoFiles[repo]; ok {
  250. rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  251. if f.IsInvalid() {
  252. return true
  253. }
  254. fs, de, by := sizeOfFile(f)
  255. files += fs
  256. deleted += de
  257. bytes += by
  258. return true
  259. })
  260. }
  261. return
  262. }
  263. // NeedSize returns the number and total size of currently needed files.
  264. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  265. m.rmut.RLock()
  266. defer m.rmut.RUnlock()
  267. if rf, ok := m.repoFiles[repo]; ok {
  268. rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  269. fs, de, by := sizeOfFile(f)
  270. files += fs + de
  271. bytes += by
  272. return true
  273. })
  274. }
  275. if debug {
  276. l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
  277. }
  278. return
  279. }
  280. // NeedFiles returns the list of currently needed files, stopping at maxFiles
  281. // files or maxBlocks blocks. Limits <= 0 are ignored.
  282. func (m *Model) NeedFilesRepoLimited(repo string, maxFiles, maxBlocks int) []protocol.FileInfo {
  283. m.rmut.RLock()
  284. defer m.rmut.RUnlock()
  285. nblocks := 0
  286. if rf, ok := m.repoFiles[repo]; ok {
  287. fs := make([]protocol.FileInfo, 0, maxFiles)
  288. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  289. fi := f.(protocol.FileInfo)
  290. fs = append(fs, fi)
  291. nblocks += len(fi.Blocks)
  292. return (maxFiles <= 0 || len(fs) < maxFiles) && (maxBlocks <= 0 || nblocks < maxBlocks)
  293. })
  294. return fs
  295. }
  296. return nil
  297. }
  298. // Index is called when a new node is connected and we receive their full index.
  299. // Implements the protocol.Model interface.
  300. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  301. if debug {
  302. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  303. }
  304. if !m.repoSharedWith(repo, nodeID) {
  305. events.Default.Log(events.RepoRejected, map[string]string{
  306. "repo": repo,
  307. "node": nodeID.String(),
  308. })
  309. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  310. return
  311. }
  312. m.rmut.RLock()
  313. files, ok := m.repoFiles[repo]
  314. ignores, _ := m.repoIgnores[repo]
  315. m.rmut.RUnlock()
  316. if !ok {
  317. l.Fatalf("Index for nonexistant repo %q", repo)
  318. }
  319. for i := 0; i < len(fs); {
  320. lamport.Default.Tick(fs[i].Version)
  321. if ignores.Match(fs[i].Name) {
  322. fs[i] = fs[len(fs)-1]
  323. fs = fs[:len(fs)-1]
  324. } else {
  325. i++
  326. }
  327. }
  328. files.Replace(nodeID, fs)
  329. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  330. "node": nodeID.String(),
  331. "repo": repo,
  332. "items": len(fs),
  333. "version": files.LocalVersion(nodeID),
  334. })
  335. }
  336. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  337. // Implements the protocol.Model interface.
  338. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  339. if debug {
  340. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  341. }
  342. if !m.repoSharedWith(repo, nodeID) {
  343. l.Infof("Update for unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  344. return
  345. }
  346. m.rmut.RLock()
  347. files, ok := m.repoFiles[repo]
  348. ignores, _ := m.repoIgnores[repo]
  349. m.rmut.RUnlock()
  350. if !ok {
  351. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  352. }
  353. for i := 0; i < len(fs); {
  354. lamport.Default.Tick(fs[i].Version)
  355. if ignores.Match(fs[i].Name) {
  356. fs[i] = fs[len(fs)-1]
  357. fs = fs[:len(fs)-1]
  358. } else {
  359. i++
  360. }
  361. }
  362. files.Update(nodeID, fs)
  363. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  364. "node": nodeID.String(),
  365. "repo": repo,
  366. "items": len(fs),
  367. "version": files.LocalVersion(nodeID),
  368. })
  369. }
  370. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  371. m.rmut.RLock()
  372. defer m.rmut.RUnlock()
  373. for _, nrepo := range m.nodeRepos[nodeID] {
  374. if nrepo == repo {
  375. return true
  376. }
  377. }
  378. return false
  379. }
  380. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  381. m.pmut.Lock()
  382. if config.ClientName == "syncthing" {
  383. m.nodeVer[nodeID] = config.ClientVersion
  384. } else {
  385. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  386. }
  387. m.pmut.Unlock()
  388. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  389. if name := config.GetOption("name"); name != "" {
  390. l.Infof("Node %s hostname is %q", nodeID, name)
  391. node := m.cfg.GetNodeConfiguration(nodeID)
  392. if node != nil && node.Name == "" {
  393. node.Name = name
  394. m.cfg.Save()
  395. }
  396. }
  397. }
  398. // Close removes the peer from the model and closes the underlying connection if possible.
  399. // Implements the protocol.Model interface.
  400. func (m *Model) Close(node protocol.NodeID, err error) {
  401. l.Infof("Connection to %s closed: %v", node, err)
  402. events.Default.Log(events.NodeDisconnected, map[string]string{
  403. "id": node.String(),
  404. "error": err.Error(),
  405. })
  406. m.pmut.Lock()
  407. m.rmut.RLock()
  408. for _, repo := range m.nodeRepos[node] {
  409. m.repoFiles[repo].Replace(node, nil)
  410. }
  411. m.rmut.RUnlock()
  412. conn, ok := m.rawConn[node]
  413. if ok {
  414. if conn, ok := conn.(*tls.Conn); ok {
  415. // If the underlying connection is a *tls.Conn, Close() does more
  416. // than it says on the tin. Specifically, it sends a TLS alert
  417. // message, which might block forever if the connection is dead
  418. // and we don't have a deadline site.
  419. conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond))
  420. }
  421. conn.Close()
  422. }
  423. delete(m.protoConn, node)
  424. delete(m.rawConn, node)
  425. delete(m.nodeVer, node)
  426. m.pmut.Unlock()
  427. }
  428. // Request returns the specified data segment by reading it from local disk.
  429. // Implements the protocol.Model interface.
  430. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  431. // Verify that the requested file exists in the local model.
  432. m.rmut.RLock()
  433. r, ok := m.repoFiles[repo]
  434. m.rmut.RUnlock()
  435. if !ok {
  436. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  437. return nil, ErrNoSuchFile
  438. }
  439. lf := r.Get(protocol.LocalNodeID, name)
  440. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  441. if debug {
  442. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  443. }
  444. return nil, ErrInvalid
  445. }
  446. if offset > lf.Size() {
  447. if debug {
  448. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  449. }
  450. return nil, ErrNoSuchFile
  451. }
  452. if debug && nodeID != protocol.LocalNodeID {
  453. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  454. }
  455. m.rmut.RLock()
  456. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  457. m.rmut.RUnlock()
  458. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  459. if err != nil {
  460. return nil, err
  461. }
  462. defer fd.Close()
  463. buf := make([]byte, size)
  464. _, err = fd.ReadAt(buf, offset)
  465. if err != nil {
  466. return nil, err
  467. }
  468. return buf, nil
  469. }
  470. // ReplaceLocal replaces the local repository index with the given list of files.
  471. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  472. m.rmut.RLock()
  473. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  474. m.rmut.RUnlock()
  475. }
  476. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  477. m.rmut.RLock()
  478. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  479. m.rmut.RUnlock()
  480. return f
  481. }
  482. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  483. m.rmut.RLock()
  484. f := m.repoFiles[repo].GetGlobal(file)
  485. m.rmut.RUnlock()
  486. return f
  487. }
  488. type cFiler struct {
  489. m *Model
  490. r string
  491. }
  492. // Implements scanner.CurrentFiler
  493. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  494. return cf.m.CurrentRepoFile(cf.r, file)
  495. }
  496. // ConnectedTo returns true if we are connected to the named node.
  497. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  498. m.pmut.RLock()
  499. _, ok := m.protoConn[nodeID]
  500. m.pmut.RUnlock()
  501. if ok {
  502. m.nodeWasSeen(nodeID)
  503. }
  504. return ok
  505. }
  506. // AddConnection adds a new peer connection to the model. An initial index will
  507. // be sent to the connected peer, thereafter index updates whenever the local
  508. // repository changes.
  509. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  510. nodeID := protoConn.ID()
  511. m.pmut.Lock()
  512. if _, ok := m.protoConn[nodeID]; ok {
  513. panic("add existing node")
  514. }
  515. m.protoConn[nodeID] = protoConn
  516. if _, ok := m.rawConn[nodeID]; ok {
  517. panic("add existing node")
  518. }
  519. m.rawConn[nodeID] = rawConn
  520. cm := m.clusterConfig(nodeID)
  521. protoConn.ClusterConfig(cm)
  522. m.rmut.RLock()
  523. for _, repo := range m.nodeRepos[nodeID] {
  524. fs := m.repoFiles[repo]
  525. go sendIndexes(protoConn, repo, fs, m.repoIgnores[repo])
  526. }
  527. m.rmut.RUnlock()
  528. m.pmut.Unlock()
  529. m.nodeWasSeen(nodeID)
  530. }
  531. func (m *Model) nodeStatRef(nodeID protocol.NodeID) *stats.NodeStatisticsReference {
  532. m.rmut.Lock()
  533. defer m.rmut.Unlock()
  534. if sr, ok := m.nodeStatRefs[nodeID]; ok {
  535. return sr
  536. } else {
  537. sr = stats.NewNodeStatisticsReference(m.db, nodeID)
  538. m.nodeStatRefs[nodeID] = sr
  539. return sr
  540. }
  541. }
  542. func (m *Model) nodeWasSeen(nodeID protocol.NodeID) {
  543. m.nodeStatRef(nodeID).WasSeen()
  544. }
  545. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) {
  546. nodeID := conn.ID()
  547. name := conn.Name()
  548. var err error
  549. if debug {
  550. l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
  551. }
  552. defer func() {
  553. if debug {
  554. l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
  555. }
  556. }()
  557. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs, ignores)
  558. for err == nil {
  559. time.Sleep(5 * time.Second)
  560. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  561. continue
  562. }
  563. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs, ignores)
  564. }
  565. }
  566. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) (uint64, error) {
  567. nodeID := conn.ID()
  568. name := conn.Name()
  569. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  570. currentBatchSize := 0
  571. maxLocalVer := uint64(0)
  572. var err error
  573. fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  574. f := fi.(protocol.FileInfo)
  575. if f.LocalVersion <= minLocalVer {
  576. return true
  577. }
  578. if f.LocalVersion > maxLocalVer {
  579. maxLocalVer = f.LocalVersion
  580. }
  581. if ignores.Match(f.Name) {
  582. return true
  583. }
  584. if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
  585. if initial {
  586. if err = conn.Index(repo, batch); err != nil {
  587. return false
  588. }
  589. if debug {
  590. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", nodeID, name, repo, len(batch), currentBatchSize)
  591. }
  592. initial = false
  593. } else {
  594. if err = conn.IndexUpdate(repo, batch); err != nil {
  595. return false
  596. }
  597. if debug {
  598. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", nodeID, name, repo, len(batch), currentBatchSize)
  599. }
  600. }
  601. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  602. currentBatchSize = 0
  603. }
  604. batch = append(batch, f)
  605. currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
  606. return true
  607. })
  608. if initial && err == nil {
  609. err = conn.Index(repo, batch)
  610. if debug && err == nil {
  611. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  612. }
  613. } else if len(batch) > 0 && err == nil {
  614. err = conn.IndexUpdate(repo, batch)
  615. if debug && err == nil {
  616. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  617. }
  618. }
  619. return maxLocalVer, err
  620. }
  621. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  622. f.LocalVersion = 0
  623. m.rmut.RLock()
  624. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  625. m.rmut.RUnlock()
  626. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  627. "repo": repo,
  628. "name": f.Name,
  629. "modified": time.Unix(f.Modified, 0),
  630. "flags": fmt.Sprintf("0%o", f.Flags),
  631. "size": f.Size(),
  632. })
  633. }
  634. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  635. m.pmut.RLock()
  636. nc, ok := m.protoConn[nodeID]
  637. m.pmut.RUnlock()
  638. if !ok {
  639. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  640. }
  641. if debug {
  642. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  643. }
  644. return nc.Request(repo, name, offset, size)
  645. }
  646. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  647. if m.started {
  648. panic("cannot add repo to started model")
  649. }
  650. if len(cfg.ID) == 0 {
  651. panic("cannot add empty repo id")
  652. }
  653. m.rmut.Lock()
  654. m.repoCfgs[cfg.ID] = cfg
  655. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  656. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  657. for i, node := range cfg.Nodes {
  658. m.repoNodes[cfg.ID][i] = node.NodeID
  659. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  660. }
  661. m.addedRepo = true
  662. m.rmut.Unlock()
  663. }
  664. func (m *Model) ScanRepos() {
  665. m.rmut.RLock()
  666. var repos = make([]string, 0, len(m.repoCfgs))
  667. for repo := range m.repoCfgs {
  668. repos = append(repos, repo)
  669. }
  670. m.rmut.RUnlock()
  671. var wg sync.WaitGroup
  672. wg.Add(len(repos))
  673. for _, repo := range repos {
  674. repo := repo
  675. go func() {
  676. err := m.ScanRepo(repo)
  677. if err != nil {
  678. invalidateRepo(m.cfg, repo, err)
  679. }
  680. wg.Done()
  681. }()
  682. }
  683. wg.Wait()
  684. }
  685. func (m *Model) CleanRepos() {
  686. m.rmut.RLock()
  687. var dirs = make([]string, 0, len(m.repoCfgs))
  688. for _, cfg := range m.repoCfgs {
  689. dirs = append(dirs, cfg.Directory)
  690. }
  691. m.rmut.RUnlock()
  692. var wg sync.WaitGroup
  693. wg.Add(len(dirs))
  694. for _, dir := range dirs {
  695. w := &scanner.Walker{
  696. Dir: dir,
  697. TempNamer: defTempNamer,
  698. }
  699. go func() {
  700. w.CleanTempFiles()
  701. wg.Done()
  702. }()
  703. }
  704. wg.Wait()
  705. }
  706. func (m *Model) ScanRepo(repo string) error {
  707. return m.ScanRepoSub(repo, "")
  708. }
  709. func (m *Model) ScanRepoSub(repo, sub string) error {
  710. if p := filepath.Clean(filepath.Join(repo, sub)); !strings.HasPrefix(p, repo) {
  711. return errors.New("invalid subpath")
  712. }
  713. m.rmut.RLock()
  714. fs, ok := m.repoFiles[repo]
  715. dir := m.repoCfgs[repo].Directory
  716. ignores, _ := ignore.Load(filepath.Join(dir, ".stignore"))
  717. m.repoIgnores[repo] = ignores
  718. w := &scanner.Walker{
  719. Dir: dir,
  720. Sub: sub,
  721. Ignores: ignores,
  722. BlockSize: scanner.StandardBlockSize,
  723. TempNamer: defTempNamer,
  724. CurrentFiler: cFiler{m, repo},
  725. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  726. }
  727. m.rmut.RUnlock()
  728. if !ok {
  729. return errors.New("no such repo")
  730. }
  731. m.setState(repo, RepoScanning)
  732. fchan, err := w.Walk()
  733. if err != nil {
  734. return err
  735. }
  736. batchSize := 100
  737. batch := make([]protocol.FileInfo, 0, 00)
  738. for f := range fchan {
  739. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  740. "repo": repo,
  741. "name": f.Name,
  742. "modified": time.Unix(f.Modified, 0),
  743. "flags": fmt.Sprintf("0%o", f.Flags),
  744. "size": f.Size(),
  745. })
  746. if len(batch) == batchSize {
  747. fs.Update(protocol.LocalNodeID, batch)
  748. batch = batch[:0]
  749. }
  750. batch = append(batch, f)
  751. }
  752. if len(batch) > 0 {
  753. fs.Update(protocol.LocalNodeID, batch)
  754. }
  755. batch = batch[:0]
  756. // TODO: We should limit the Have scanning to start at sub
  757. seenPrefix := false
  758. fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  759. f := fi.(protocol.FileInfoTruncated)
  760. if !strings.HasPrefix(f.Name, sub) {
  761. // Return true so that we keep iterating, until we get to the part
  762. // of the tree we are interested in. Then return false so we stop
  763. // iterating when we've passed the end of the subtree.
  764. return !seenPrefix
  765. }
  766. seenPrefix = true
  767. if !protocol.IsDeleted(f.Flags) {
  768. if f.IsInvalid() {
  769. return true
  770. }
  771. if len(batch) == batchSize {
  772. fs.Update(protocol.LocalNodeID, batch)
  773. batch = batch[:0]
  774. }
  775. if ignores.Match(f.Name) {
  776. // File has been ignored. Set invalid bit.
  777. nf := protocol.FileInfo{
  778. Name: f.Name,
  779. Flags: f.Flags | protocol.FlagInvalid,
  780. Modified: f.Modified,
  781. Version: f.Version, // The file is still the same, so don't bump version
  782. }
  783. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  784. "repo": repo,
  785. "name": f.Name,
  786. "modified": time.Unix(f.Modified, 0),
  787. "flags": fmt.Sprintf("0%o", f.Flags),
  788. "size": f.Size(),
  789. })
  790. batch = append(batch, nf)
  791. } else if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  792. // File has been deleted
  793. nf := protocol.FileInfo{
  794. Name: f.Name,
  795. Flags: f.Flags | protocol.FlagDeleted,
  796. Modified: f.Modified,
  797. Version: lamport.Default.Tick(f.Version),
  798. }
  799. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  800. "repo": repo,
  801. "name": f.Name,
  802. "modified": time.Unix(f.Modified, 0),
  803. "flags": fmt.Sprintf("0%o", f.Flags),
  804. "size": f.Size(),
  805. })
  806. batch = append(batch, nf)
  807. }
  808. }
  809. return true
  810. })
  811. if len(batch) > 0 {
  812. fs.Update(protocol.LocalNodeID, batch)
  813. }
  814. m.setState(repo, RepoIdle)
  815. return nil
  816. }
  817. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  818. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  819. cm := protocol.ClusterConfigMessage{
  820. ClientName: m.clientName,
  821. ClientVersion: m.clientVersion,
  822. Options: []protocol.Option{
  823. {
  824. Key: "name",
  825. Value: m.nodeName,
  826. },
  827. },
  828. }
  829. m.rmut.RLock()
  830. for _, repo := range m.nodeRepos[node] {
  831. cr := protocol.Repository{
  832. ID: repo,
  833. }
  834. for _, node := range m.repoNodes[repo] {
  835. // NodeID is a value type, but with an underlying array. Copy it
  836. // so we don't grab aliases to the same array later on in node[:]
  837. node := node
  838. // TODO: Set read only bit when relevant
  839. cr.Nodes = append(cr.Nodes, protocol.Node{
  840. ID: node[:],
  841. Flags: protocol.FlagShareTrusted,
  842. })
  843. }
  844. cm.Repositories = append(cm.Repositories, cr)
  845. }
  846. m.rmut.RUnlock()
  847. return cm
  848. }
  849. func (m *Model) setState(repo string, state repoState) {
  850. m.smut.Lock()
  851. oldState := m.repoState[repo]
  852. changed, ok := m.repoStateChanged[repo]
  853. if state != oldState {
  854. m.repoState[repo] = state
  855. m.repoStateChanged[repo] = time.Now()
  856. eventData := map[string]interface{}{
  857. "repo": repo,
  858. "to": state.String(),
  859. }
  860. if ok {
  861. eventData["duration"] = time.Since(changed).Seconds()
  862. eventData["from"] = oldState.String()
  863. }
  864. events.Default.Log(events.StateChanged, eventData)
  865. }
  866. m.smut.Unlock()
  867. }
  868. func (m *Model) State(repo string) (string, time.Time) {
  869. m.smut.RLock()
  870. state := m.repoState[repo]
  871. changed := m.repoStateChanged[repo]
  872. m.smut.RUnlock()
  873. return state.String(), changed
  874. }
  875. func (m *Model) Override(repo string) {
  876. m.rmut.RLock()
  877. fs := m.repoFiles[repo]
  878. m.rmut.RUnlock()
  879. m.setState(repo, RepoScanning)
  880. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  881. fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  882. need := fi.(protocol.FileInfo)
  883. if len(batch) == indexBatchSize {
  884. fs.Update(protocol.LocalNodeID, batch)
  885. batch = batch[:0]
  886. }
  887. have := fs.Get(protocol.LocalNodeID, need.Name)
  888. if have.Name != need.Name {
  889. // We are missing the file
  890. need.Flags |= protocol.FlagDeleted
  891. need.Blocks = nil
  892. } else {
  893. // We have the file, replace with our version
  894. need = have
  895. }
  896. need.Version = lamport.Default.Tick(need.Version)
  897. need.LocalVersion = 0
  898. batch = append(batch, need)
  899. return true
  900. })
  901. if len(batch) > 0 {
  902. fs.Update(protocol.LocalNodeID, batch)
  903. }
  904. m.setState(repo, RepoIdle)
  905. }
  906. // Version returns the change version for the given repository. This is
  907. // guaranteed to increment if the contents of the local or global repository
  908. // has changed.
  909. func (m *Model) LocalVersion(repo string) uint64 {
  910. m.rmut.Lock()
  911. defer m.rmut.Unlock()
  912. fs, ok := m.repoFiles[repo]
  913. if !ok {
  914. panic("bug: LocalVersion called for nonexistent repo " + repo)
  915. }
  916. ver := fs.LocalVersion(protocol.LocalNodeID)
  917. for _, n := range m.repoNodes[repo] {
  918. ver += fs.LocalVersion(n)
  919. }
  920. return ver
  921. }