model.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "errors"
  7. "fmt"
  8. "io"
  9. "net"
  10. "os"
  11. "path/filepath"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/syncthing/syncthing/config"
  17. "github.com/syncthing/syncthing/events"
  18. "github.com/syncthing/syncthing/files"
  19. "github.com/syncthing/syncthing/lamport"
  20. "github.com/syncthing/syncthing/protocol"
  21. "github.com/syncthing/syncthing/scanner"
  22. "github.com/syndtr/goleveldb/leveldb"
  23. )
  24. type repoState int
  25. const (
  26. RepoIdle repoState = iota
  27. RepoScanning
  28. RepoSyncing
  29. RepoCleaning
  30. )
  31. func (s repoState) String() string {
  32. switch s {
  33. case RepoIdle:
  34. return "idle"
  35. case RepoScanning:
  36. return "scanning"
  37. case RepoCleaning:
  38. return "cleaning"
  39. case RepoSyncing:
  40. return "syncing"
  41. default:
  42. return "unknown"
  43. }
  44. }
  45. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  46. // of an unsynchronized directory entry or a deleted file. We need it to be
  47. // larger than zero so that it's visible that there is some amount of bytes to
  48. // transfer to bring the systems into synchronization.
  49. const zeroEntrySize = 128
  50. // How many files to send in each Index/IndexUpdate message.
  51. const (
  52. indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
  53. indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
  54. IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
  55. indexBatchSize = 1000 // Either way, don't include more files than this
  56. )
  57. type Model struct {
  58. indexDir string
  59. cfg *config.Configuration
  60. db *leveldb.DB
  61. nodeName string
  62. clientName string
  63. clientVersion string
  64. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  65. repoFiles map[string]*files.Set // repo -> files
  66. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  67. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  68. rmut sync.RWMutex // protects the above
  69. repoState map[string]repoState // repo -> state
  70. repoStateChanged map[string]time.Time // repo -> time when state changed
  71. smut sync.RWMutex
  72. protoConn map[protocol.NodeID]protocol.Connection
  73. rawConn map[protocol.NodeID]io.Closer
  74. nodeVer map[protocol.NodeID]string
  75. pmut sync.RWMutex // protects protoConn and rawConn
  76. sentLocalVer map[protocol.NodeID]map[string]uint64
  77. slMut sync.Mutex
  78. addedRepo bool
  79. started bool
  80. }
  81. var (
  82. ErrNoSuchFile = errors.New("no such file")
  83. ErrInvalid = errors.New("file is invalid")
  84. )
  85. // NewModel creates and starts a new model. The model starts in read-only mode,
  86. // where it sends index information to connected peers and responds to requests
  87. // for file data without altering the local repository in any way.
  88. func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, clientVersion string, db *leveldb.DB) *Model {
  89. m := &Model{
  90. indexDir: indexDir,
  91. cfg: cfg,
  92. db: db,
  93. nodeName: nodeName,
  94. clientName: clientName,
  95. clientVersion: clientVersion,
  96. repoCfgs: make(map[string]config.RepositoryConfiguration),
  97. repoFiles: make(map[string]*files.Set),
  98. repoNodes: make(map[string][]protocol.NodeID),
  99. nodeRepos: make(map[protocol.NodeID][]string),
  100. repoState: make(map[string]repoState),
  101. repoStateChanged: make(map[string]time.Time),
  102. protoConn: make(map[protocol.NodeID]protocol.Connection),
  103. rawConn: make(map[protocol.NodeID]io.Closer),
  104. nodeVer: make(map[protocol.NodeID]string),
  105. sentLocalVer: make(map[protocol.NodeID]map[string]uint64),
  106. }
  107. var timeout = 20 * 60 // seconds
  108. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  109. it, err := strconv.Atoi(t)
  110. if err == nil {
  111. timeout = it
  112. }
  113. }
  114. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  115. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  116. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  117. return m
  118. }
  119. // StartRW starts read/write processing on the current model. When in
  120. // read/write mode the model will attempt to keep in sync with the cluster by
  121. // pulling needed files from peer nodes.
  122. func (m *Model) StartRepoRW(repo string, threads int) {
  123. m.rmut.RLock()
  124. defer m.rmut.RUnlock()
  125. if cfg, ok := m.repoCfgs[repo]; !ok {
  126. panic("cannot start without repo")
  127. } else {
  128. newPuller(cfg, m, threads, m.cfg)
  129. }
  130. }
  131. // StartRO starts read only processing on the current model. When in
  132. // read only mode the model will announce files to the cluster but not
  133. // pull in any external changes.
  134. func (m *Model) StartRepoRO(repo string) {
  135. m.StartRepoRW(repo, 0) // zero threads => read only
  136. }
  137. type ConnectionInfo struct {
  138. protocol.Statistics
  139. Address string
  140. ClientVersion string
  141. }
  142. // ConnectionStats returns a map with connection statistics for each connected node.
  143. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  144. type remoteAddrer interface {
  145. RemoteAddr() net.Addr
  146. }
  147. m.pmut.RLock()
  148. m.rmut.RLock()
  149. var res = make(map[string]ConnectionInfo)
  150. for node, conn := range m.protoConn {
  151. ci := ConnectionInfo{
  152. Statistics: conn.Statistics(),
  153. ClientVersion: m.nodeVer[node],
  154. }
  155. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  156. ci.Address = nc.RemoteAddr().String()
  157. }
  158. res[node.String()] = ci
  159. }
  160. m.rmut.RUnlock()
  161. m.pmut.RUnlock()
  162. in, out := protocol.TotalInOut()
  163. res["total"] = ConnectionInfo{
  164. Statistics: protocol.Statistics{
  165. At: time.Now(),
  166. InBytesTotal: in,
  167. OutBytesTotal: out,
  168. },
  169. }
  170. return res
  171. }
  172. // Returns the completion status, in percent, for the given node and repo.
  173. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  174. var tot int64
  175. m.rmut.RLock()
  176. rf, ok := m.repoFiles[repo]
  177. m.rmut.RUnlock()
  178. if !ok {
  179. return 0 // Repo doesn't exist, so we hardly have any of it
  180. }
  181. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  182. if !f.IsDeleted() {
  183. tot += f.Size()
  184. }
  185. return true
  186. })
  187. if tot == 0 {
  188. return 100 // Repo is empty, so we have all of it
  189. }
  190. var need int64
  191. rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
  192. if !f.IsDeleted() {
  193. need += f.Size()
  194. }
  195. return true
  196. })
  197. res := 100 * (1 - float64(need)/float64(tot))
  198. if debug {
  199. l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
  200. }
  201. return res
  202. }
  203. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  204. for _, f := range fs {
  205. fs, de, by := sizeOfFile(f)
  206. files += fs
  207. deleted += de
  208. bytes += by
  209. }
  210. return
  211. }
  212. func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
  213. if !f.IsDeleted() {
  214. files++
  215. } else {
  216. deleted++
  217. }
  218. bytes += f.Size()
  219. return
  220. }
  221. // GlobalSize returns the number of files, deleted files and total bytes for all
  222. // files in the global model.
  223. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  224. m.rmut.RLock()
  225. defer m.rmut.RUnlock()
  226. if rf, ok := m.repoFiles[repo]; ok {
  227. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  228. fs, de, by := sizeOfFile(f)
  229. files += fs
  230. deleted += de
  231. bytes += by
  232. return true
  233. })
  234. }
  235. return
  236. }
  237. // LocalSize returns the number of files, deleted files and total bytes for all
  238. // files in the local repository.
  239. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  240. m.rmut.RLock()
  241. defer m.rmut.RUnlock()
  242. if rf, ok := m.repoFiles[repo]; ok {
  243. rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  244. fs, de, by := sizeOfFile(f)
  245. files += fs
  246. deleted += de
  247. bytes += by
  248. return true
  249. })
  250. }
  251. return
  252. }
  253. // NeedSize returns the number and total size of currently needed files.
  254. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  255. m.rmut.RLock()
  256. defer m.rmut.RUnlock()
  257. if rf, ok := m.repoFiles[repo]; ok {
  258. rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  259. fs, de, by := sizeOfFile(f)
  260. files += fs + de
  261. bytes += by
  262. return true
  263. })
  264. }
  265. if debug {
  266. l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
  267. }
  268. return
  269. }
  270. // NeedFiles returns the list of currently needed files
  271. func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
  272. m.rmut.RLock()
  273. defer m.rmut.RUnlock()
  274. if rf, ok := m.repoFiles[repo]; ok {
  275. fs := make([]protocol.FileInfo, 0, indexBatchSize)
  276. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  277. fs = append(fs, f.(protocol.FileInfo))
  278. return len(fs) < indexBatchSize
  279. })
  280. return fs
  281. }
  282. return nil
  283. }
  284. // Index is called when a new node is connected and we receive their full index.
  285. // Implements the protocol.Model interface.
  286. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  287. if debug {
  288. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  289. }
  290. if !m.repoSharedWith(repo, nodeID) {
  291. events.Default.Log(events.RepoRejected, map[string]string{
  292. "repo": repo,
  293. "node": nodeID.String(),
  294. })
  295. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  296. return
  297. }
  298. for i := range fs {
  299. lamport.Default.Tick(fs[i].Version)
  300. }
  301. m.rmut.RLock()
  302. r, ok := m.repoFiles[repo]
  303. m.rmut.RUnlock()
  304. if ok {
  305. r.Replace(nodeID, fs)
  306. } else {
  307. l.Fatalf("Index for nonexistant repo %q", repo)
  308. }
  309. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  310. "node": nodeID.String(),
  311. "repo": repo,
  312. "items": len(fs),
  313. "version": r.LocalVersion(nodeID),
  314. })
  315. }
  316. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  317. // Implements the protocol.Model interface.
  318. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  319. if debug {
  320. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  321. }
  322. if !m.repoSharedWith(repo, nodeID) {
  323. l.Infof("Update for unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  324. return
  325. }
  326. for i := range fs {
  327. lamport.Default.Tick(fs[i].Version)
  328. }
  329. m.rmut.RLock()
  330. r, ok := m.repoFiles[repo]
  331. m.rmut.RUnlock()
  332. if ok {
  333. r.Update(nodeID, fs)
  334. } else {
  335. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  336. }
  337. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  338. "node": nodeID.String(),
  339. "repo": repo,
  340. "items": len(fs),
  341. "version": r.LocalVersion(nodeID),
  342. })
  343. }
  344. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  345. m.rmut.RLock()
  346. defer m.rmut.RUnlock()
  347. for _, nrepo := range m.nodeRepos[nodeID] {
  348. if nrepo == repo {
  349. return true
  350. }
  351. }
  352. return false
  353. }
  354. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  355. m.pmut.Lock()
  356. if config.ClientName == "syncthing" {
  357. m.nodeVer[nodeID] = config.ClientVersion
  358. } else {
  359. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  360. }
  361. m.pmut.Unlock()
  362. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  363. if name := config.GetOption("name"); name != "" {
  364. l.Infof("Node %s hostname is %q", nodeID, name)
  365. node := m.cfg.GetNodeConfiguration(nodeID)
  366. if node != nil && node.Name == "" {
  367. node.Name = name
  368. }
  369. }
  370. }
  371. // Close removes the peer from the model and closes the underlying connection if possible.
  372. // Implements the protocol.Model interface.
  373. func (m *Model) Close(node protocol.NodeID, err error) {
  374. l.Infof("Connection to %s closed: %v", node, err)
  375. events.Default.Log(events.NodeDisconnected, map[string]string{
  376. "id": node.String(),
  377. "error": err.Error(),
  378. })
  379. m.pmut.Lock()
  380. m.rmut.RLock()
  381. for _, repo := range m.nodeRepos[node] {
  382. m.repoFiles[repo].Replace(node, nil)
  383. }
  384. m.rmut.RUnlock()
  385. conn, ok := m.rawConn[node]
  386. if ok {
  387. conn.Close()
  388. }
  389. delete(m.protoConn, node)
  390. delete(m.rawConn, node)
  391. delete(m.nodeVer, node)
  392. m.pmut.Unlock()
  393. }
  394. // Request returns the specified data segment by reading it from local disk.
  395. // Implements the protocol.Model interface.
  396. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  397. // Verify that the requested file exists in the local model.
  398. m.rmut.RLock()
  399. r, ok := m.repoFiles[repo]
  400. m.rmut.RUnlock()
  401. if !ok {
  402. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  403. return nil, ErrNoSuchFile
  404. }
  405. lf := r.Get(protocol.LocalNodeID, name)
  406. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  407. if debug {
  408. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  409. }
  410. return nil, ErrInvalid
  411. }
  412. if offset > lf.Size() {
  413. if debug {
  414. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  415. }
  416. return nil, ErrNoSuchFile
  417. }
  418. if debug && nodeID != protocol.LocalNodeID {
  419. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  420. }
  421. m.rmut.RLock()
  422. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  423. m.rmut.RUnlock()
  424. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  425. if err != nil {
  426. return nil, err
  427. }
  428. defer fd.Close()
  429. buf := make([]byte, size)
  430. _, err = fd.ReadAt(buf, offset)
  431. if err != nil {
  432. return nil, err
  433. }
  434. return buf, nil
  435. }
  436. // ReplaceLocal replaces the local repository index with the given list of files.
  437. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  438. m.rmut.RLock()
  439. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  440. m.rmut.RUnlock()
  441. }
  442. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  443. m.rmut.RLock()
  444. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  445. m.rmut.RUnlock()
  446. return f
  447. }
  448. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  449. m.rmut.RLock()
  450. f := m.repoFiles[repo].GetGlobal(file)
  451. m.rmut.RUnlock()
  452. return f
  453. }
  454. type cFiler struct {
  455. m *Model
  456. r string
  457. }
  458. // Implements scanner.CurrentFiler
  459. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  460. return cf.m.CurrentRepoFile(cf.r, file)
  461. }
  462. // ConnectedTo returns true if we are connected to the named node.
  463. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  464. m.pmut.RLock()
  465. _, ok := m.protoConn[nodeID]
  466. m.pmut.RUnlock()
  467. return ok
  468. }
  469. // AddConnection adds a new peer connection to the model. An initial index will
  470. // be sent to the connected peer, thereafter index updates whenever the local
  471. // repository changes.
  472. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  473. nodeID := protoConn.ID()
  474. m.pmut.Lock()
  475. if _, ok := m.protoConn[nodeID]; ok {
  476. panic("add existing node")
  477. }
  478. m.protoConn[nodeID] = protoConn
  479. if _, ok := m.rawConn[nodeID]; ok {
  480. panic("add existing node")
  481. }
  482. m.rawConn[nodeID] = rawConn
  483. cm := m.clusterConfig(nodeID)
  484. protoConn.ClusterConfig(cm)
  485. m.rmut.RLock()
  486. for _, repo := range m.nodeRepos[nodeID] {
  487. fs := m.repoFiles[repo]
  488. go sendIndexes(protoConn, repo, fs)
  489. }
  490. m.rmut.RUnlock()
  491. m.pmut.Unlock()
  492. }
  493. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set) {
  494. nodeID := conn.ID()
  495. name := conn.Name()
  496. var err error
  497. if debug {
  498. l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
  499. }
  500. defer func() {
  501. if debug {
  502. l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
  503. }
  504. }()
  505. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs)
  506. for err == nil {
  507. time.Sleep(5 * time.Second)
  508. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  509. continue
  510. }
  511. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs)
  512. }
  513. }
  514. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set) (uint64, error) {
  515. nodeID := conn.ID()
  516. name := conn.Name()
  517. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  518. currentBatchSize := 0
  519. maxLocalVer := uint64(0)
  520. var err error
  521. fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  522. f := fi.(protocol.FileInfo)
  523. if f.LocalVersion <= minLocalVer {
  524. return true
  525. }
  526. if f.LocalVersion > maxLocalVer {
  527. maxLocalVer = f.LocalVersion
  528. }
  529. if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
  530. if initial {
  531. if err = conn.Index(repo, batch); err != nil {
  532. return false
  533. }
  534. if debug {
  535. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", nodeID, name, repo, len(batch), currentBatchSize)
  536. }
  537. initial = false
  538. } else {
  539. if err = conn.IndexUpdate(repo, batch); err != nil {
  540. return false
  541. }
  542. if debug {
  543. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", nodeID, name, repo, len(batch), currentBatchSize)
  544. }
  545. }
  546. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  547. currentBatchSize = 0
  548. }
  549. batch = append(batch, f)
  550. currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
  551. return true
  552. })
  553. if initial && err == nil {
  554. err = conn.Index(repo, batch)
  555. if debug && err == nil {
  556. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  557. }
  558. } else if len(batch) > 0 && err == nil {
  559. err = conn.IndexUpdate(repo, batch)
  560. if debug && err == nil {
  561. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  562. }
  563. }
  564. return maxLocalVer, err
  565. }
  566. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  567. f.LocalVersion = 0
  568. m.rmut.RLock()
  569. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  570. m.rmut.RUnlock()
  571. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  572. "repo": repo,
  573. "name": f.Name,
  574. "modified": time.Unix(f.Modified, 0),
  575. "flags": fmt.Sprintf("0%o", f.Flags),
  576. "size": f.Size(),
  577. })
  578. }
  579. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  580. m.pmut.RLock()
  581. nc, ok := m.protoConn[nodeID]
  582. m.pmut.RUnlock()
  583. if !ok {
  584. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  585. }
  586. if debug {
  587. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  588. }
  589. return nc.Request(repo, name, offset, size)
  590. }
  591. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  592. if m.started {
  593. panic("cannot add repo to started model")
  594. }
  595. if len(cfg.ID) == 0 {
  596. panic("cannot add empty repo id")
  597. }
  598. m.rmut.Lock()
  599. m.repoCfgs[cfg.ID] = cfg
  600. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  601. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  602. for i, node := range cfg.Nodes {
  603. m.repoNodes[cfg.ID][i] = node.NodeID
  604. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  605. }
  606. m.addedRepo = true
  607. m.rmut.Unlock()
  608. }
  609. func (m *Model) ScanRepos() {
  610. m.rmut.RLock()
  611. var repos = make([]string, 0, len(m.repoCfgs))
  612. for repo := range m.repoCfgs {
  613. repos = append(repos, repo)
  614. }
  615. m.rmut.RUnlock()
  616. var wg sync.WaitGroup
  617. wg.Add(len(repos))
  618. for _, repo := range repos {
  619. repo := repo
  620. go func() {
  621. err := m.ScanRepo(repo)
  622. if err != nil {
  623. invalidateRepo(m.cfg, repo, err)
  624. }
  625. wg.Done()
  626. }()
  627. }
  628. wg.Wait()
  629. }
  630. func (m *Model) CleanRepos() {
  631. m.rmut.RLock()
  632. var dirs = make([]string, 0, len(m.repoCfgs))
  633. for _, cfg := range m.repoCfgs {
  634. dirs = append(dirs, cfg.Directory)
  635. }
  636. m.rmut.RUnlock()
  637. var wg sync.WaitGroup
  638. wg.Add(len(dirs))
  639. for _, dir := range dirs {
  640. w := &scanner.Walker{
  641. Dir: dir,
  642. TempNamer: defTempNamer,
  643. }
  644. go func() {
  645. w.CleanTempFiles()
  646. wg.Done()
  647. }()
  648. }
  649. wg.Wait()
  650. }
  651. func (m *Model) ScanRepo(repo string) error {
  652. return m.ScanRepoSub(repo, "")
  653. }
  654. func (m *Model) ScanRepoSub(repo, sub string) error {
  655. if p := filepath.Clean(filepath.Join(repo, sub)); !strings.HasPrefix(p, repo) {
  656. return errors.New("invalid subpath")
  657. }
  658. m.rmut.RLock()
  659. fs, ok := m.repoFiles[repo]
  660. dir := m.repoCfgs[repo].Directory
  661. w := &scanner.Walker{
  662. Dir: dir,
  663. Sub: sub,
  664. IgnoreFile: ".stignore",
  665. BlockSize: scanner.StandardBlockSize,
  666. TempNamer: defTempNamer,
  667. CurrentFiler: cFiler{m, repo},
  668. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  669. }
  670. m.rmut.RUnlock()
  671. if !ok {
  672. return errors.New("no such repo")
  673. }
  674. m.setState(repo, RepoScanning)
  675. fchan, _, err := w.Walk()
  676. if err != nil {
  677. return err
  678. }
  679. batchSize := 100
  680. batch := make([]protocol.FileInfo, 0, 00)
  681. for f := range fchan {
  682. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  683. "repo": repo,
  684. "name": f.Name,
  685. "modified": time.Unix(f.Modified, 0),
  686. "flags": fmt.Sprintf("0%o", f.Flags),
  687. "size": f.Size(),
  688. })
  689. if len(batch) == batchSize {
  690. fs.Update(protocol.LocalNodeID, batch)
  691. batch = batch[:0]
  692. }
  693. batch = append(batch, f)
  694. }
  695. if len(batch) > 0 {
  696. fs.Update(protocol.LocalNodeID, batch)
  697. }
  698. batch = batch[:0]
  699. // TODO: We should limit the Have scanning to start at sub
  700. seenPrefix := false
  701. fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  702. f := fi.(protocol.FileInfoTruncated)
  703. if !strings.HasPrefix(f.Name, sub) {
  704. return !seenPrefix
  705. }
  706. seenPrefix = true
  707. if !protocol.IsDeleted(f.Flags) {
  708. if len(batch) == batchSize {
  709. fs.Update(protocol.LocalNodeID, batch)
  710. batch = batch[:0]
  711. }
  712. if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  713. // File has been deleted
  714. nf := protocol.FileInfo{
  715. Name: f.Name,
  716. Flags: f.Flags | protocol.FlagDeleted,
  717. Modified: f.Modified,
  718. Version: lamport.Default.Tick(f.Version),
  719. }
  720. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  721. "repo": repo,
  722. "name": f.Name,
  723. "modified": time.Unix(f.Modified, 0),
  724. "flags": fmt.Sprintf("0%o", f.Flags),
  725. "size": f.Size(),
  726. })
  727. batch = append(batch, nf)
  728. }
  729. }
  730. return true
  731. })
  732. if len(batch) > 0 {
  733. fs.Update(protocol.LocalNodeID, batch)
  734. }
  735. m.setState(repo, RepoIdle)
  736. return nil
  737. }
  738. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  739. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  740. cm := protocol.ClusterConfigMessage{
  741. ClientName: m.clientName,
  742. ClientVersion: m.clientVersion,
  743. Options: []protocol.Option{
  744. {
  745. Key: "name",
  746. Value: m.nodeName,
  747. },
  748. },
  749. }
  750. m.rmut.RLock()
  751. for _, repo := range m.nodeRepos[node] {
  752. cr := protocol.Repository{
  753. ID: repo,
  754. }
  755. for _, node := range m.repoNodes[repo] {
  756. // TODO: Set read only bit when relevant
  757. cr.Nodes = append(cr.Nodes, protocol.Node{
  758. ID: node[:],
  759. Flags: protocol.FlagShareTrusted,
  760. })
  761. }
  762. cm.Repositories = append(cm.Repositories, cr)
  763. }
  764. m.rmut.RUnlock()
  765. return cm
  766. }
  767. func (m *Model) setState(repo string, state repoState) {
  768. m.smut.Lock()
  769. oldState := m.repoState[repo]
  770. changed, ok := m.repoStateChanged[repo]
  771. if state != oldState {
  772. m.repoState[repo] = state
  773. m.repoStateChanged[repo] = time.Now()
  774. eventData := map[string]interface{}{
  775. "repo": repo,
  776. "to": state.String(),
  777. }
  778. if ok {
  779. eventData["duration"] = time.Since(changed).Seconds()
  780. eventData["from"] = oldState.String()
  781. }
  782. events.Default.Log(events.StateChanged, eventData)
  783. }
  784. m.smut.Unlock()
  785. }
  786. func (m *Model) State(repo string) (string, time.Time) {
  787. m.smut.RLock()
  788. state := m.repoState[repo]
  789. changed := m.repoStateChanged[repo]
  790. m.smut.RUnlock()
  791. return state.String(), changed
  792. }
  793. func (m *Model) Override(repo string) {
  794. m.rmut.RLock()
  795. fs := m.repoFiles[repo]
  796. m.rmut.RUnlock()
  797. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  798. fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  799. need := fi.(protocol.FileInfo)
  800. if len(batch) == indexBatchSize {
  801. fs.Update(protocol.LocalNodeID, batch)
  802. batch = batch[:0]
  803. }
  804. have := fs.Get(protocol.LocalNodeID, need.Name)
  805. if have.Name != need.Name {
  806. // We are missing the file
  807. need.Flags |= protocol.FlagDeleted
  808. need.Blocks = nil
  809. } else {
  810. // We have the file, replace with our version
  811. need = have
  812. }
  813. need.Version = lamport.Default.Tick(need.Version)
  814. need.LocalVersion = 0
  815. batch = append(batch, need)
  816. return true
  817. })
  818. if len(batch) > 0 {
  819. fs.Update(protocol.LocalNodeID, batch)
  820. }
  821. }
  822. // Version returns the change version for the given repository. This is
  823. // guaranteed to increment if the contents of the local or global repository
  824. // has changed.
  825. func (m *Model) LocalVersion(repo string) uint64 {
  826. m.rmut.Lock()
  827. defer m.rmut.Unlock()
  828. fs, ok := m.repoFiles[repo]
  829. if !ok {
  830. return 0
  831. }
  832. ver := fs.LocalVersion(protocol.LocalNodeID)
  833. for _, n := range m.repoNodes[repo] {
  834. ver += fs.LocalVersion(n)
  835. }
  836. return ver
  837. }