model.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "crypto/tls"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net"
  11. "os"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/syncthing/syncthing/config"
  18. "github.com/syncthing/syncthing/events"
  19. "github.com/syncthing/syncthing/files"
  20. "github.com/syncthing/syncthing/ignore"
  21. "github.com/syncthing/syncthing/lamport"
  22. "github.com/syncthing/syncthing/protocol"
  23. "github.com/syncthing/syncthing/scanner"
  24. "github.com/syncthing/syncthing/stats"
  25. "github.com/syndtr/goleveldb/leveldb"
  26. )
  27. type repoState int
  28. const (
  29. RepoIdle repoState = iota
  30. RepoScanning
  31. RepoSyncing
  32. RepoCleaning
  33. )
  34. func (s repoState) String() string {
  35. switch s {
  36. case RepoIdle:
  37. return "idle"
  38. case RepoScanning:
  39. return "scanning"
  40. case RepoCleaning:
  41. return "cleaning"
  42. case RepoSyncing:
  43. return "syncing"
  44. default:
  45. return "unknown"
  46. }
  47. }
  48. // How many files to send in each Index/IndexUpdate message.
  49. const (
  50. indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
  51. indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
  52. IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
  53. indexBatchSize = 1000 // Either way, don't include more files than this
  54. )
  55. type Model struct {
  56. indexDir string
  57. cfg *config.Configuration
  58. db *leveldb.DB
  59. nodeName string
  60. clientName string
  61. clientVersion string
  62. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  63. repoFiles map[string]*files.Set // repo -> files
  64. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  65. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  66. nodeStatRefs map[protocol.NodeID]*stats.NodeStatisticsReference // nodeID -> statsRef
  67. repoIgnores map[string]ignore.Patterns // repo -> list of ignore patterns
  68. rmut sync.RWMutex // protects the above
  69. repoState map[string]repoState // repo -> state
  70. repoStateChanged map[string]time.Time // repo -> time when state changed
  71. smut sync.RWMutex
  72. protoConn map[protocol.NodeID]protocol.Connection
  73. rawConn map[protocol.NodeID]io.Closer
  74. nodeVer map[protocol.NodeID]string
  75. pmut sync.RWMutex // protects protoConn and rawConn
  76. addedRepo bool
  77. started bool
  78. }
  79. var (
  80. ErrNoSuchFile = errors.New("no such file")
  81. ErrInvalid = errors.New("file is invalid")
  82. )
  83. // NewModel creates and starts a new model. The model starts in read-only mode,
  84. // where it sends index information to connected peers and responds to requests
  85. // for file data without altering the local repository in any way.
  86. func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, clientVersion string, db *leveldb.DB) *Model {
  87. m := &Model{
  88. indexDir: indexDir,
  89. cfg: cfg,
  90. db: db,
  91. nodeName: nodeName,
  92. clientName: clientName,
  93. clientVersion: clientVersion,
  94. repoCfgs: make(map[string]config.RepositoryConfiguration),
  95. repoFiles: make(map[string]*files.Set),
  96. repoNodes: make(map[string][]protocol.NodeID),
  97. nodeRepos: make(map[protocol.NodeID][]string),
  98. nodeStatRefs: make(map[protocol.NodeID]*stats.NodeStatisticsReference),
  99. repoIgnores: make(map[string]ignore.Patterns),
  100. repoState: make(map[string]repoState),
  101. repoStateChanged: make(map[string]time.Time),
  102. protoConn: make(map[protocol.NodeID]protocol.Connection),
  103. rawConn: make(map[protocol.NodeID]io.Closer),
  104. nodeVer: make(map[protocol.NodeID]string),
  105. }
  106. for _, node := range cfg.Nodes {
  107. m.nodeStatRefs[node.NodeID] = stats.NewNodeStatisticsReference(db, node.NodeID)
  108. }
  109. var timeout = 20 * 60 // seconds
  110. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  111. it, err := strconv.Atoi(t)
  112. if err == nil {
  113. timeout = it
  114. }
  115. }
  116. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  117. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  118. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  119. return m
  120. }
  121. // StartRW starts read/write processing on the current model. When in
  122. // read/write mode the model will attempt to keep in sync with the cluster by
  123. // pulling needed files from peer nodes.
  124. func (m *Model) StartRepoRW(repo string, threads int) {
  125. m.rmut.RLock()
  126. defer m.rmut.RUnlock()
  127. if cfg, ok := m.repoCfgs[repo]; !ok {
  128. panic("cannot start without repo")
  129. } else {
  130. newPuller(cfg, m, threads, m.cfg)
  131. }
  132. }
  133. // StartRO starts read only processing on the current model. When in
  134. // read only mode the model will announce files to the cluster but not
  135. // pull in any external changes.
  136. func (m *Model) StartRepoRO(repo string) {
  137. m.StartRepoRW(repo, 0) // zero threads => read only
  138. }
  139. type ConnectionInfo struct {
  140. protocol.Statistics
  141. Address string
  142. ClientVersion string
  143. }
  144. // ConnectionStats returns a map with connection statistics for each connected node.
  145. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  146. type remoteAddrer interface {
  147. RemoteAddr() net.Addr
  148. }
  149. m.pmut.RLock()
  150. m.rmut.RLock()
  151. var res = make(map[string]ConnectionInfo)
  152. for node, conn := range m.protoConn {
  153. ci := ConnectionInfo{
  154. Statistics: conn.Statistics(),
  155. ClientVersion: m.nodeVer[node],
  156. }
  157. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  158. ci.Address = nc.RemoteAddr().String()
  159. }
  160. res[node.String()] = ci
  161. }
  162. m.rmut.RUnlock()
  163. m.pmut.RUnlock()
  164. in, out := protocol.TotalInOut()
  165. res["total"] = ConnectionInfo{
  166. Statistics: protocol.Statistics{
  167. At: time.Now(),
  168. InBytesTotal: in,
  169. OutBytesTotal: out,
  170. },
  171. }
  172. return res
  173. }
  174. // Returns statistics about each node
  175. func (m *Model) NodeStatistics() map[string]stats.NodeStatistics {
  176. var res = make(map[string]stats.NodeStatistics)
  177. m.rmut.RLock()
  178. for _, node := range m.cfg.Nodes {
  179. res[node.NodeID.String()] = m.nodeStatRefs[node.NodeID].GetStatistics()
  180. }
  181. m.rmut.RUnlock()
  182. return res
  183. }
  184. // Returns the completion status, in percent, for the given node and repo.
  185. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  186. var tot int64
  187. m.rmut.RLock()
  188. rf, ok := m.repoFiles[repo]
  189. m.rmut.RUnlock()
  190. if !ok {
  191. return 0 // Repo doesn't exist, so we hardly have any of it
  192. }
  193. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  194. if !f.IsDeleted() {
  195. tot += f.Size()
  196. }
  197. return true
  198. })
  199. if tot == 0 {
  200. return 100 // Repo is empty, so we have all of it
  201. }
  202. var need int64
  203. rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
  204. if !f.IsDeleted() {
  205. need += f.Size()
  206. }
  207. return true
  208. })
  209. res := 100 * (1 - float64(need)/float64(tot))
  210. if debug {
  211. l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
  212. }
  213. return res
  214. }
  215. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  216. for _, f := range fs {
  217. fs, de, by := sizeOfFile(f)
  218. files += fs
  219. deleted += de
  220. bytes += by
  221. }
  222. return
  223. }
  224. func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
  225. if !f.IsDeleted() {
  226. files++
  227. } else {
  228. deleted++
  229. }
  230. bytes += f.Size()
  231. return
  232. }
  233. // GlobalSize returns the number of files, deleted files and total bytes for all
  234. // files in the global model.
  235. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  236. m.rmut.RLock()
  237. defer m.rmut.RUnlock()
  238. if rf, ok := m.repoFiles[repo]; ok {
  239. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  240. fs, de, by := sizeOfFile(f)
  241. files += fs
  242. deleted += de
  243. bytes += by
  244. return true
  245. })
  246. }
  247. return
  248. }
  249. // LocalSize returns the number of files, deleted files and total bytes for all
  250. // files in the local repository.
  251. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  252. m.rmut.RLock()
  253. defer m.rmut.RUnlock()
  254. if rf, ok := m.repoFiles[repo]; ok {
  255. rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  256. if f.IsInvalid() {
  257. return true
  258. }
  259. fs, de, by := sizeOfFile(f)
  260. files += fs
  261. deleted += de
  262. bytes += by
  263. return true
  264. })
  265. }
  266. return
  267. }
  268. // NeedSize returns the number and total size of currently needed files.
  269. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  270. m.rmut.RLock()
  271. defer m.rmut.RUnlock()
  272. if rf, ok := m.repoFiles[repo]; ok {
  273. rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  274. fs, de, by := sizeOfFile(f)
  275. files += fs + de
  276. bytes += by
  277. return true
  278. })
  279. }
  280. if debug {
  281. l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
  282. }
  283. return
  284. }
  285. // NeedFiles returns the list of currently needed files, stopping at maxFiles
  286. // files or maxBlocks blocks. Limits <= 0 are ignored.
  287. func (m *Model) NeedFilesRepoLimited(repo string, maxFiles, maxBlocks int) []protocol.FileInfo {
  288. m.rmut.RLock()
  289. defer m.rmut.RUnlock()
  290. nblocks := 0
  291. if rf, ok := m.repoFiles[repo]; ok {
  292. fs := make([]protocol.FileInfo, 0, maxFiles)
  293. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  294. fi := f.(protocol.FileInfo)
  295. fs = append(fs, fi)
  296. nblocks += len(fi.Blocks)
  297. return (maxFiles <= 0 || len(fs) < maxFiles) && (maxBlocks <= 0 || nblocks < maxBlocks)
  298. })
  299. return fs
  300. }
  301. return nil
  302. }
  303. // Index is called when a new node is connected and we receive their full index.
  304. // Implements the protocol.Model interface.
  305. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  306. if debug {
  307. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  308. }
  309. if !m.repoSharedWith(repo, nodeID) {
  310. events.Default.Log(events.RepoRejected, map[string]string{
  311. "repo": repo,
  312. "node": nodeID.String(),
  313. })
  314. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  315. return
  316. }
  317. m.rmut.RLock()
  318. files, ok := m.repoFiles[repo]
  319. ignores, _ := m.repoIgnores[repo]
  320. m.rmut.RUnlock()
  321. if !ok {
  322. l.Fatalf("Index for nonexistant repo %q", repo)
  323. }
  324. for i := 0; i < len(fs); {
  325. lamport.Default.Tick(fs[i].Version)
  326. if ignores.Match(fs[i].Name) {
  327. fs[i] = fs[len(fs)-1]
  328. fs = fs[:len(fs)-1]
  329. } else {
  330. i++
  331. }
  332. }
  333. files.Replace(nodeID, fs)
  334. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  335. "node": nodeID.String(),
  336. "repo": repo,
  337. "items": len(fs),
  338. "version": files.LocalVersion(nodeID),
  339. })
  340. }
  341. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  342. // Implements the protocol.Model interface.
  343. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  344. if debug {
  345. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  346. }
  347. if !m.repoSharedWith(repo, nodeID) {
  348. l.Infof("Update for unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  349. return
  350. }
  351. m.rmut.RLock()
  352. files, ok := m.repoFiles[repo]
  353. ignores, _ := m.repoIgnores[repo]
  354. m.rmut.RUnlock()
  355. if !ok {
  356. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  357. }
  358. for i := 0; i < len(fs); {
  359. lamport.Default.Tick(fs[i].Version)
  360. if ignores.Match(fs[i].Name) {
  361. fs[i] = fs[len(fs)-1]
  362. fs = fs[:len(fs)-1]
  363. } else {
  364. i++
  365. }
  366. }
  367. files.Update(nodeID, fs)
  368. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  369. "node": nodeID.String(),
  370. "repo": repo,
  371. "items": len(fs),
  372. "version": files.LocalVersion(nodeID),
  373. })
  374. }
  375. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  376. m.rmut.RLock()
  377. defer m.rmut.RUnlock()
  378. for _, nrepo := range m.nodeRepos[nodeID] {
  379. if nrepo == repo {
  380. return true
  381. }
  382. }
  383. return false
  384. }
  385. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  386. m.pmut.Lock()
  387. if config.ClientName == "syncthing" {
  388. m.nodeVer[nodeID] = config.ClientVersion
  389. } else {
  390. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  391. }
  392. m.pmut.Unlock()
  393. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  394. if name := config.GetOption("name"); name != "" {
  395. l.Infof("Node %s hostname is %q", nodeID, name)
  396. node := m.cfg.GetNodeConfiguration(nodeID)
  397. if node != nil && node.Name == "" {
  398. node.Name = name
  399. m.cfg.Save()
  400. }
  401. }
  402. }
  403. // Close removes the peer from the model and closes the underlying connection if possible.
  404. // Implements the protocol.Model interface.
  405. func (m *Model) Close(node protocol.NodeID, err error) {
  406. l.Infof("Connection to %s closed: %v", node, err)
  407. events.Default.Log(events.NodeDisconnected, map[string]string{
  408. "id": node.String(),
  409. "error": err.Error(),
  410. })
  411. m.pmut.Lock()
  412. m.rmut.RLock()
  413. for _, repo := range m.nodeRepos[node] {
  414. m.repoFiles[repo].Replace(node, nil)
  415. }
  416. m.rmut.RUnlock()
  417. conn, ok := m.rawConn[node]
  418. if ok {
  419. if conn, ok := conn.(*tls.Conn); ok {
  420. // If the underlying connection is a *tls.Conn, Close() does more
  421. // than it says on the tin. Specifically, it sends a TLS alert
  422. // message, which might block forever if the connection is dead
  423. // and we don't have a deadline site.
  424. conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond))
  425. }
  426. conn.Close()
  427. }
  428. delete(m.protoConn, node)
  429. delete(m.rawConn, node)
  430. delete(m.nodeVer, node)
  431. m.pmut.Unlock()
  432. }
  433. // Request returns the specified data segment by reading it from local disk.
  434. // Implements the protocol.Model interface.
  435. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  436. // Verify that the requested file exists in the local model.
  437. m.rmut.RLock()
  438. r, ok := m.repoFiles[repo]
  439. m.rmut.RUnlock()
  440. if !ok {
  441. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  442. return nil, ErrNoSuchFile
  443. }
  444. lf := r.Get(protocol.LocalNodeID, name)
  445. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  446. if debug {
  447. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  448. }
  449. return nil, ErrInvalid
  450. }
  451. if offset > lf.Size() {
  452. if debug {
  453. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  454. }
  455. return nil, ErrNoSuchFile
  456. }
  457. if debug && nodeID != protocol.LocalNodeID {
  458. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  459. }
  460. m.rmut.RLock()
  461. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  462. m.rmut.RUnlock()
  463. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  464. if err != nil {
  465. return nil, err
  466. }
  467. defer fd.Close()
  468. buf := make([]byte, size)
  469. _, err = fd.ReadAt(buf, offset)
  470. if err != nil {
  471. return nil, err
  472. }
  473. return buf, nil
  474. }
  475. // ReplaceLocal replaces the local repository index with the given list of files.
  476. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  477. m.rmut.RLock()
  478. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  479. m.rmut.RUnlock()
  480. }
  481. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  482. m.rmut.RLock()
  483. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  484. m.rmut.RUnlock()
  485. return f
  486. }
  487. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  488. m.rmut.RLock()
  489. f := m.repoFiles[repo].GetGlobal(file)
  490. m.rmut.RUnlock()
  491. return f
  492. }
  493. type cFiler struct {
  494. m *Model
  495. r string
  496. }
  497. // Implements scanner.CurrentFiler
  498. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  499. return cf.m.CurrentRepoFile(cf.r, file)
  500. }
  501. // ConnectedTo returns true if we are connected to the named node.
  502. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  503. m.pmut.RLock()
  504. _, ok := m.protoConn[nodeID]
  505. if ok {
  506. if statRef, ok := m.nodeStatRefs[nodeID]; ok {
  507. statRef.WasSeen()
  508. }
  509. }
  510. m.pmut.RUnlock()
  511. return ok
  512. }
  513. // AddConnection adds a new peer connection to the model. An initial index will
  514. // be sent to the connected peer, thereafter index updates whenever the local
  515. // repository changes.
  516. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  517. nodeID := protoConn.ID()
  518. m.pmut.Lock()
  519. if _, ok := m.protoConn[nodeID]; ok {
  520. panic("add existing node")
  521. }
  522. m.protoConn[nodeID] = protoConn
  523. if _, ok := m.rawConn[nodeID]; ok {
  524. panic("add existing node")
  525. }
  526. m.rawConn[nodeID] = rawConn
  527. cm := m.clusterConfig(nodeID)
  528. protoConn.ClusterConfig(cm)
  529. m.rmut.RLock()
  530. for _, repo := range m.nodeRepos[nodeID] {
  531. fs := m.repoFiles[repo]
  532. go sendIndexes(protoConn, repo, fs, m.repoIgnores[repo])
  533. }
  534. if statRef, ok := m.nodeStatRefs[nodeID]; ok {
  535. statRef.WasSeen()
  536. } else {
  537. l.Warnf("AddConnection for unconfigured node %v?", nodeID)
  538. }
  539. m.rmut.RUnlock()
  540. m.pmut.Unlock()
  541. }
  542. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) {
  543. nodeID := conn.ID()
  544. name := conn.Name()
  545. var err error
  546. if debug {
  547. l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
  548. }
  549. defer func() {
  550. if debug {
  551. l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
  552. }
  553. }()
  554. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs, ignores)
  555. for err == nil {
  556. time.Sleep(5 * time.Second)
  557. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  558. continue
  559. }
  560. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs, ignores)
  561. }
  562. }
  563. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) (uint64, error) {
  564. nodeID := conn.ID()
  565. name := conn.Name()
  566. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  567. currentBatchSize := 0
  568. maxLocalVer := uint64(0)
  569. var err error
  570. fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  571. f := fi.(protocol.FileInfo)
  572. if f.LocalVersion <= minLocalVer {
  573. return true
  574. }
  575. if f.LocalVersion > maxLocalVer {
  576. maxLocalVer = f.LocalVersion
  577. }
  578. if ignores.Match(f.Name) {
  579. return true
  580. }
  581. if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
  582. if initial {
  583. if err = conn.Index(repo, batch); err != nil {
  584. return false
  585. }
  586. if debug {
  587. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", nodeID, name, repo, len(batch), currentBatchSize)
  588. }
  589. initial = false
  590. } else {
  591. if err = conn.IndexUpdate(repo, batch); err != nil {
  592. return false
  593. }
  594. if debug {
  595. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", nodeID, name, repo, len(batch), currentBatchSize)
  596. }
  597. }
  598. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  599. currentBatchSize = 0
  600. }
  601. batch = append(batch, f)
  602. currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
  603. return true
  604. })
  605. if initial && err == nil {
  606. err = conn.Index(repo, batch)
  607. if debug && err == nil {
  608. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  609. }
  610. } else if len(batch) > 0 && err == nil {
  611. err = conn.IndexUpdate(repo, batch)
  612. if debug && err == nil {
  613. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  614. }
  615. }
  616. return maxLocalVer, err
  617. }
  618. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  619. f.LocalVersion = 0
  620. m.rmut.RLock()
  621. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  622. m.rmut.RUnlock()
  623. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  624. "repo": repo,
  625. "name": f.Name,
  626. "modified": time.Unix(f.Modified, 0),
  627. "flags": fmt.Sprintf("0%o", f.Flags),
  628. "size": f.Size(),
  629. })
  630. }
  631. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  632. m.pmut.RLock()
  633. nc, ok := m.protoConn[nodeID]
  634. m.pmut.RUnlock()
  635. if !ok {
  636. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  637. }
  638. if debug {
  639. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  640. }
  641. return nc.Request(repo, name, offset, size)
  642. }
  643. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  644. if m.started {
  645. panic("cannot add repo to started model")
  646. }
  647. if len(cfg.ID) == 0 {
  648. panic("cannot add empty repo id")
  649. }
  650. m.rmut.Lock()
  651. m.repoCfgs[cfg.ID] = cfg
  652. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  653. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  654. for i, node := range cfg.Nodes {
  655. m.repoNodes[cfg.ID][i] = node.NodeID
  656. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  657. }
  658. m.addedRepo = true
  659. m.rmut.Unlock()
  660. }
  661. func (m *Model) ScanRepos() {
  662. m.rmut.RLock()
  663. var repos = make([]string, 0, len(m.repoCfgs))
  664. for repo := range m.repoCfgs {
  665. repos = append(repos, repo)
  666. }
  667. m.rmut.RUnlock()
  668. var wg sync.WaitGroup
  669. wg.Add(len(repos))
  670. for _, repo := range repos {
  671. repo := repo
  672. go func() {
  673. err := m.ScanRepo(repo)
  674. if err != nil {
  675. invalidateRepo(m.cfg, repo, err)
  676. }
  677. wg.Done()
  678. }()
  679. }
  680. wg.Wait()
  681. }
  682. func (m *Model) CleanRepos() {
  683. m.rmut.RLock()
  684. var dirs = make([]string, 0, len(m.repoCfgs))
  685. for _, cfg := range m.repoCfgs {
  686. dirs = append(dirs, cfg.Directory)
  687. }
  688. m.rmut.RUnlock()
  689. var wg sync.WaitGroup
  690. wg.Add(len(dirs))
  691. for _, dir := range dirs {
  692. w := &scanner.Walker{
  693. Dir: dir,
  694. TempNamer: defTempNamer,
  695. }
  696. go func() {
  697. w.CleanTempFiles()
  698. wg.Done()
  699. }()
  700. }
  701. wg.Wait()
  702. }
  703. func (m *Model) ScanRepo(repo string) error {
  704. return m.ScanRepoSub(repo, "")
  705. }
  706. func (m *Model) ScanRepoSub(repo, sub string) error {
  707. if p := filepath.Clean(filepath.Join(repo, sub)); !strings.HasPrefix(p, repo) {
  708. return errors.New("invalid subpath")
  709. }
  710. m.rmut.RLock()
  711. fs, ok := m.repoFiles[repo]
  712. dir := m.repoCfgs[repo].Directory
  713. ignores, _ := ignore.Load(filepath.Join(dir, ".stignore"))
  714. m.repoIgnores[repo] = ignores
  715. w := &scanner.Walker{
  716. Dir: dir,
  717. Sub: sub,
  718. Ignores: ignores,
  719. BlockSize: scanner.StandardBlockSize,
  720. TempNamer: defTempNamer,
  721. CurrentFiler: cFiler{m, repo},
  722. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  723. }
  724. m.rmut.RUnlock()
  725. if !ok {
  726. return errors.New("no such repo")
  727. }
  728. m.setState(repo, RepoScanning)
  729. fchan, err := w.Walk()
  730. if err != nil {
  731. return err
  732. }
  733. batchSize := 100
  734. batch := make([]protocol.FileInfo, 0, 00)
  735. for f := range fchan {
  736. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  737. "repo": repo,
  738. "name": f.Name,
  739. "modified": time.Unix(f.Modified, 0),
  740. "flags": fmt.Sprintf("0%o", f.Flags),
  741. "size": f.Size(),
  742. })
  743. if len(batch) == batchSize {
  744. fs.Update(protocol.LocalNodeID, batch)
  745. batch = batch[:0]
  746. }
  747. batch = append(batch, f)
  748. }
  749. if len(batch) > 0 {
  750. fs.Update(protocol.LocalNodeID, batch)
  751. }
  752. batch = batch[:0]
  753. // TODO: We should limit the Have scanning to start at sub
  754. seenPrefix := false
  755. fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  756. f := fi.(protocol.FileInfoTruncated)
  757. if !strings.HasPrefix(f.Name, sub) {
  758. // Return true so that we keep iterating, until we get to the part
  759. // of the tree we are interested in. Then return false so we stop
  760. // iterating when we've passed the end of the subtree.
  761. return !seenPrefix
  762. }
  763. seenPrefix = true
  764. if !protocol.IsDeleted(f.Flags) {
  765. if f.IsInvalid() {
  766. return true
  767. }
  768. if len(batch) == batchSize {
  769. fs.Update(protocol.LocalNodeID, batch)
  770. batch = batch[:0]
  771. }
  772. if ignores.Match(f.Name) {
  773. // File has been ignored. Set invalid bit.
  774. nf := protocol.FileInfo{
  775. Name: f.Name,
  776. Flags: f.Flags | protocol.FlagInvalid,
  777. Modified: f.Modified,
  778. Version: f.Version, // The file is still the same, so don't bump version
  779. }
  780. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  781. "repo": repo,
  782. "name": f.Name,
  783. "modified": time.Unix(f.Modified, 0),
  784. "flags": fmt.Sprintf("0%o", f.Flags),
  785. "size": f.Size(),
  786. })
  787. batch = append(batch, nf)
  788. } else if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  789. // File has been deleted
  790. nf := protocol.FileInfo{
  791. Name: f.Name,
  792. Flags: f.Flags | protocol.FlagDeleted,
  793. Modified: f.Modified,
  794. Version: lamport.Default.Tick(f.Version),
  795. }
  796. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  797. "repo": repo,
  798. "name": f.Name,
  799. "modified": time.Unix(f.Modified, 0),
  800. "flags": fmt.Sprintf("0%o", f.Flags),
  801. "size": f.Size(),
  802. })
  803. batch = append(batch, nf)
  804. }
  805. }
  806. return true
  807. })
  808. if len(batch) > 0 {
  809. fs.Update(protocol.LocalNodeID, batch)
  810. }
  811. m.setState(repo, RepoIdle)
  812. return nil
  813. }
  814. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  815. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  816. cm := protocol.ClusterConfigMessage{
  817. ClientName: m.clientName,
  818. ClientVersion: m.clientVersion,
  819. Options: []protocol.Option{
  820. {
  821. Key: "name",
  822. Value: m.nodeName,
  823. },
  824. },
  825. }
  826. m.rmut.RLock()
  827. for _, repo := range m.nodeRepos[node] {
  828. cr := protocol.Repository{
  829. ID: repo,
  830. }
  831. for _, node := range m.repoNodes[repo] {
  832. // NodeID is a value type, but with an underlying array. Copy it
  833. // so we don't grab aliases to the same array later on in node[:]
  834. node := node
  835. // TODO: Set read only bit when relevant
  836. cr.Nodes = append(cr.Nodes, protocol.Node{
  837. ID: node[:],
  838. Flags: protocol.FlagShareTrusted,
  839. })
  840. }
  841. cm.Repositories = append(cm.Repositories, cr)
  842. }
  843. m.rmut.RUnlock()
  844. return cm
  845. }
  846. func (m *Model) setState(repo string, state repoState) {
  847. m.smut.Lock()
  848. oldState := m.repoState[repo]
  849. changed, ok := m.repoStateChanged[repo]
  850. if state != oldState {
  851. m.repoState[repo] = state
  852. m.repoStateChanged[repo] = time.Now()
  853. eventData := map[string]interface{}{
  854. "repo": repo,
  855. "to": state.String(),
  856. }
  857. if ok {
  858. eventData["duration"] = time.Since(changed).Seconds()
  859. eventData["from"] = oldState.String()
  860. }
  861. events.Default.Log(events.StateChanged, eventData)
  862. }
  863. m.smut.Unlock()
  864. }
  865. func (m *Model) State(repo string) (string, time.Time) {
  866. m.smut.RLock()
  867. state := m.repoState[repo]
  868. changed := m.repoStateChanged[repo]
  869. m.smut.RUnlock()
  870. return state.String(), changed
  871. }
  872. func (m *Model) Override(repo string) {
  873. m.rmut.RLock()
  874. fs := m.repoFiles[repo]
  875. m.rmut.RUnlock()
  876. m.setState(repo, RepoScanning)
  877. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  878. fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  879. need := fi.(protocol.FileInfo)
  880. if len(batch) == indexBatchSize {
  881. fs.Update(protocol.LocalNodeID, batch)
  882. batch = batch[:0]
  883. }
  884. have := fs.Get(protocol.LocalNodeID, need.Name)
  885. if have.Name != need.Name {
  886. // We are missing the file
  887. need.Flags |= protocol.FlagDeleted
  888. need.Blocks = nil
  889. } else {
  890. // We have the file, replace with our version
  891. need = have
  892. }
  893. need.Version = lamport.Default.Tick(need.Version)
  894. need.LocalVersion = 0
  895. batch = append(batch, need)
  896. return true
  897. })
  898. if len(batch) > 0 {
  899. fs.Update(protocol.LocalNodeID, batch)
  900. }
  901. m.setState(repo, RepoIdle)
  902. }
  903. // Version returns the change version for the given repository. This is
  904. // guaranteed to increment if the contents of the local or global repository
  905. // has changed.
  906. func (m *Model) LocalVersion(repo string) uint64 {
  907. m.rmut.Lock()
  908. defer m.rmut.Unlock()
  909. fs, ok := m.repoFiles[repo]
  910. if !ok {
  911. return 0
  912. }
  913. ver := fs.LocalVersion(protocol.LocalNodeID)
  914. for _, n := range m.repoNodes[repo] {
  915. ver += fs.LocalVersion(n)
  916. }
  917. return ver
  918. }