model.go 27 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "crypto/tls"
  7. "errors"
  8. "fmt"
  9. "io"
  10. "net"
  11. "os"
  12. "path/filepath"
  13. "strconv"
  14. "strings"
  15. "sync"
  16. "time"
  17. "github.com/syncthing/syncthing/config"
  18. "github.com/syncthing/syncthing/events"
  19. "github.com/syncthing/syncthing/files"
  20. "github.com/syncthing/syncthing/ignore"
  21. "github.com/syncthing/syncthing/lamport"
  22. "github.com/syncthing/syncthing/protocol"
  23. "github.com/syncthing/syncthing/scanner"
  24. "github.com/syncthing/syncthing/stats"
  25. "github.com/syndtr/goleveldb/leveldb"
  26. )
  27. type repoState int
  28. const (
  29. RepoIdle repoState = iota
  30. RepoScanning
  31. RepoSyncing
  32. RepoCleaning
  33. )
  34. func (s repoState) String() string {
  35. switch s {
  36. case RepoIdle:
  37. return "idle"
  38. case RepoScanning:
  39. return "scanning"
  40. case RepoCleaning:
  41. return "cleaning"
  42. case RepoSyncing:
  43. return "syncing"
  44. default:
  45. return "unknown"
  46. }
  47. }
  48. // How many files to send in each Index/IndexUpdate message.
  49. const (
  50. indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
  51. indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
  52. IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
  53. indexBatchSize = 1000 // Either way, don't include more files than this
  54. )
  55. type Model struct {
  56. indexDir string
  57. cfg *config.Configuration
  58. db *leveldb.DB
  59. nodeName string
  60. clientName string
  61. clientVersion string
  62. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  63. repoFiles map[string]*files.Set // repo -> files
  64. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  65. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  66. nodeStatRefs map[protocol.NodeID]*stats.NodeStatisticsReference // nodeID -> statsRef
  67. repoIgnores map[string]ignore.Patterns // repo -> list of ignore patterns
  68. rmut sync.RWMutex // protects the above
  69. repoState map[string]repoState // repo -> state
  70. repoStateChanged map[string]time.Time // repo -> time when state changed
  71. smut sync.RWMutex
  72. protoConn map[protocol.NodeID]protocol.Connection
  73. rawConn map[protocol.NodeID]io.Closer
  74. nodeVer map[protocol.NodeID]string
  75. pmut sync.RWMutex // protects protoConn and rawConn
  76. addedRepo bool
  77. started bool
  78. }
  79. var (
  80. ErrNoSuchFile = errors.New("no such file")
  81. ErrInvalid = errors.New("file is invalid")
  82. )
  83. // NewModel creates and starts a new model. The model starts in read-only mode,
  84. // where it sends index information to connected peers and responds to requests
  85. // for file data without altering the local repository in any way.
  86. func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, clientVersion string, db *leveldb.DB) *Model {
  87. m := &Model{
  88. indexDir: indexDir,
  89. cfg: cfg,
  90. db: db,
  91. nodeName: nodeName,
  92. clientName: clientName,
  93. clientVersion: clientVersion,
  94. repoCfgs: make(map[string]config.RepositoryConfiguration),
  95. repoFiles: make(map[string]*files.Set),
  96. repoNodes: make(map[string][]protocol.NodeID),
  97. nodeRepos: make(map[protocol.NodeID][]string),
  98. nodeStatRefs: make(map[protocol.NodeID]*stats.NodeStatisticsReference),
  99. repoIgnores: make(map[string]ignore.Patterns),
  100. repoState: make(map[string]repoState),
  101. repoStateChanged: make(map[string]time.Time),
  102. protoConn: make(map[protocol.NodeID]protocol.Connection),
  103. rawConn: make(map[protocol.NodeID]io.Closer),
  104. nodeVer: make(map[protocol.NodeID]string),
  105. }
  106. for _, node := range cfg.Nodes {
  107. m.nodeStatRefs[node.NodeID] = stats.NewNodeStatisticsReference(db, node.NodeID)
  108. }
  109. var timeout = 20 * 60 // seconds
  110. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  111. it, err := strconv.Atoi(t)
  112. if err == nil {
  113. timeout = it
  114. }
  115. }
  116. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  117. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  118. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  119. return m
  120. }
  121. // StartRW starts read/write processing on the current model. When in
  122. // read/write mode the model will attempt to keep in sync with the cluster by
  123. // pulling needed files from peer nodes.
  124. func (m *Model) StartRepoRW(repo string, threads int) {
  125. m.rmut.RLock()
  126. defer m.rmut.RUnlock()
  127. if cfg, ok := m.repoCfgs[repo]; !ok {
  128. panic("cannot start without repo")
  129. } else {
  130. newPuller(cfg, m, threads, m.cfg)
  131. }
  132. }
  133. // StartRO starts read only processing on the current model. When in
  134. // read only mode the model will announce files to the cluster but not
  135. // pull in any external changes.
  136. func (m *Model) StartRepoRO(repo string) {
  137. m.StartRepoRW(repo, 0) // zero threads => read only
  138. }
  139. type ConnectionInfo struct {
  140. protocol.Statistics
  141. Address string
  142. ClientVersion string
  143. }
  144. // ConnectionStats returns a map with connection statistics for each connected node.
  145. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  146. type remoteAddrer interface {
  147. RemoteAddr() net.Addr
  148. }
  149. m.pmut.RLock()
  150. m.rmut.RLock()
  151. var res = make(map[string]ConnectionInfo)
  152. for node, conn := range m.protoConn {
  153. ci := ConnectionInfo{
  154. Statistics: conn.Statistics(),
  155. ClientVersion: m.nodeVer[node],
  156. }
  157. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  158. ci.Address = nc.RemoteAddr().String()
  159. }
  160. res[node.String()] = ci
  161. }
  162. m.rmut.RUnlock()
  163. m.pmut.RUnlock()
  164. in, out := protocol.TotalInOut()
  165. res["total"] = ConnectionInfo{
  166. Statistics: protocol.Statistics{
  167. At: time.Now(),
  168. InBytesTotal: in,
  169. OutBytesTotal: out,
  170. },
  171. }
  172. return res
  173. }
  174. // Returns statistics about each node
  175. func (m *Model) NodeStatistics() map[string]stats.NodeStatistics {
  176. var res = make(map[string]stats.NodeStatistics)
  177. m.rmut.RLock()
  178. for _, node := range m.cfg.Nodes {
  179. res[node.NodeID.String()] = m.nodeStatRefs[node.NodeID].GetStatistics()
  180. }
  181. m.rmut.RUnlock()
  182. return res
  183. }
  184. // Returns the completion status, in percent, for the given node and repo.
  185. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  186. var tot int64
  187. m.rmut.RLock()
  188. rf, ok := m.repoFiles[repo]
  189. m.rmut.RUnlock()
  190. if !ok {
  191. return 0 // Repo doesn't exist, so we hardly have any of it
  192. }
  193. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  194. if !f.IsDeleted() {
  195. tot += f.Size()
  196. }
  197. return true
  198. })
  199. if tot == 0 {
  200. return 100 // Repo is empty, so we have all of it
  201. }
  202. var need int64
  203. rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
  204. if !f.IsDeleted() {
  205. need += f.Size()
  206. }
  207. return true
  208. })
  209. res := 100 * (1 - float64(need)/float64(tot))
  210. if debug {
  211. l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
  212. }
  213. return res
  214. }
  215. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  216. for _, f := range fs {
  217. fs, de, by := sizeOfFile(f)
  218. files += fs
  219. deleted += de
  220. bytes += by
  221. }
  222. return
  223. }
  224. func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
  225. if !f.IsDeleted() {
  226. files++
  227. } else {
  228. deleted++
  229. }
  230. bytes += f.Size()
  231. return
  232. }
  233. // GlobalSize returns the number of files, deleted files and total bytes for all
  234. // files in the global model.
  235. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  236. m.rmut.RLock()
  237. defer m.rmut.RUnlock()
  238. if rf, ok := m.repoFiles[repo]; ok {
  239. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  240. fs, de, by := sizeOfFile(f)
  241. files += fs
  242. deleted += de
  243. bytes += by
  244. return true
  245. })
  246. }
  247. return
  248. }
  249. // LocalSize returns the number of files, deleted files and total bytes for all
  250. // files in the local repository.
  251. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  252. m.rmut.RLock()
  253. defer m.rmut.RUnlock()
  254. if rf, ok := m.repoFiles[repo]; ok {
  255. rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  256. if f.IsInvalid() {
  257. return true
  258. }
  259. fs, de, by := sizeOfFile(f)
  260. files += fs
  261. deleted += de
  262. bytes += by
  263. return true
  264. })
  265. }
  266. return
  267. }
  268. // NeedSize returns the number and total size of currently needed files.
  269. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  270. m.rmut.RLock()
  271. defer m.rmut.RUnlock()
  272. if rf, ok := m.repoFiles[repo]; ok {
  273. rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  274. fs, de, by := sizeOfFile(f)
  275. files += fs + de
  276. bytes += by
  277. return true
  278. })
  279. }
  280. if debug {
  281. l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
  282. }
  283. return
  284. }
  285. // NeedFiles returns the list of currently needed files
  286. func (m *Model) NeedFilesRepo(repo string) []protocol.FileInfo {
  287. m.rmut.RLock()
  288. defer m.rmut.RUnlock()
  289. if rf, ok := m.repoFiles[repo]; ok {
  290. fs := make([]protocol.FileInfo, 0, indexBatchSize)
  291. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  292. fs = append(fs, f.(protocol.FileInfo))
  293. return len(fs) < indexBatchSize
  294. })
  295. return fs
  296. }
  297. return nil
  298. }
  299. // Index is called when a new node is connected and we receive their full index.
  300. // Implements the protocol.Model interface.
  301. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  302. if debug {
  303. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  304. }
  305. if !m.repoSharedWith(repo, nodeID) {
  306. events.Default.Log(events.RepoRejected, map[string]string{
  307. "repo": repo,
  308. "node": nodeID.String(),
  309. })
  310. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  311. return
  312. }
  313. m.rmut.RLock()
  314. files, ok := m.repoFiles[repo]
  315. ignores, _ := m.repoIgnores[repo]
  316. m.rmut.RUnlock()
  317. if !ok {
  318. l.Fatalf("Index for nonexistant repo %q", repo)
  319. }
  320. for i := 0; i < len(fs); {
  321. lamport.Default.Tick(fs[i].Version)
  322. if ignores.Match(fs[i].Name) {
  323. fs[i] = fs[len(fs)-1]
  324. fs = fs[:len(fs)-1]
  325. } else {
  326. i++
  327. }
  328. }
  329. files.Replace(nodeID, fs)
  330. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  331. "node": nodeID.String(),
  332. "repo": repo,
  333. "items": len(fs),
  334. "version": files.LocalVersion(nodeID),
  335. })
  336. }
  337. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  338. // Implements the protocol.Model interface.
  339. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  340. if debug {
  341. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  342. }
  343. if !m.repoSharedWith(repo, nodeID) {
  344. l.Infof("Update for unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  345. return
  346. }
  347. m.rmut.RLock()
  348. files, ok := m.repoFiles[repo]
  349. ignores, _ := m.repoIgnores[repo]
  350. m.rmut.RUnlock()
  351. if !ok {
  352. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  353. }
  354. for i := 0; i < len(fs); {
  355. lamport.Default.Tick(fs[i].Version)
  356. if ignores.Match(fs[i].Name) {
  357. fs[i] = fs[len(fs)-1]
  358. fs = fs[:len(fs)-1]
  359. } else {
  360. i++
  361. }
  362. }
  363. files.Update(nodeID, fs)
  364. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  365. "node": nodeID.String(),
  366. "repo": repo,
  367. "items": len(fs),
  368. "version": files.LocalVersion(nodeID),
  369. })
  370. }
  371. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  372. m.rmut.RLock()
  373. defer m.rmut.RUnlock()
  374. for _, nrepo := range m.nodeRepos[nodeID] {
  375. if nrepo == repo {
  376. return true
  377. }
  378. }
  379. return false
  380. }
  381. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  382. m.pmut.Lock()
  383. if config.ClientName == "syncthing" {
  384. m.nodeVer[nodeID] = config.ClientVersion
  385. } else {
  386. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  387. }
  388. m.pmut.Unlock()
  389. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  390. if name := config.GetOption("name"); name != "" {
  391. l.Infof("Node %s hostname is %q", nodeID, name)
  392. node := m.cfg.GetNodeConfiguration(nodeID)
  393. if node != nil && node.Name == "" {
  394. node.Name = name
  395. m.cfg.Save()
  396. }
  397. }
  398. }
  399. // Close removes the peer from the model and closes the underlying connection if possible.
  400. // Implements the protocol.Model interface.
  401. func (m *Model) Close(node protocol.NodeID, err error) {
  402. l.Infof("Connection to %s closed: %v", node, err)
  403. events.Default.Log(events.NodeDisconnected, map[string]string{
  404. "id": node.String(),
  405. "error": err.Error(),
  406. })
  407. m.pmut.Lock()
  408. m.rmut.RLock()
  409. for _, repo := range m.nodeRepos[node] {
  410. m.repoFiles[repo].Replace(node, nil)
  411. }
  412. m.rmut.RUnlock()
  413. conn, ok := m.rawConn[node]
  414. if ok {
  415. if conn, ok := conn.(*tls.Conn); ok {
  416. // If the underlying connection is a *tls.Conn, Close() does more
  417. // than it says on the tin. Specifically, it sends a TLS alert
  418. // message, which might block forever if the connection is dead
  419. // and we don't have a deadline site.
  420. conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond))
  421. }
  422. conn.Close()
  423. }
  424. delete(m.protoConn, node)
  425. delete(m.rawConn, node)
  426. delete(m.nodeVer, node)
  427. m.pmut.Unlock()
  428. }
  429. // Request returns the specified data segment by reading it from local disk.
  430. // Implements the protocol.Model interface.
  431. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  432. // Verify that the requested file exists in the local model.
  433. m.rmut.RLock()
  434. r, ok := m.repoFiles[repo]
  435. m.rmut.RUnlock()
  436. if !ok {
  437. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  438. return nil, ErrNoSuchFile
  439. }
  440. lf := r.Get(protocol.LocalNodeID, name)
  441. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  442. if debug {
  443. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  444. }
  445. return nil, ErrInvalid
  446. }
  447. if offset > lf.Size() {
  448. if debug {
  449. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  450. }
  451. return nil, ErrNoSuchFile
  452. }
  453. if debug && nodeID != protocol.LocalNodeID {
  454. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  455. }
  456. m.rmut.RLock()
  457. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  458. m.rmut.RUnlock()
  459. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  460. if err != nil {
  461. return nil, err
  462. }
  463. defer fd.Close()
  464. buf := make([]byte, size)
  465. _, err = fd.ReadAt(buf, offset)
  466. if err != nil {
  467. return nil, err
  468. }
  469. return buf, nil
  470. }
  471. // ReplaceLocal replaces the local repository index with the given list of files.
  472. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  473. m.rmut.RLock()
  474. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  475. m.rmut.RUnlock()
  476. }
  477. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  478. m.rmut.RLock()
  479. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  480. m.rmut.RUnlock()
  481. return f
  482. }
  483. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  484. m.rmut.RLock()
  485. f := m.repoFiles[repo].GetGlobal(file)
  486. m.rmut.RUnlock()
  487. return f
  488. }
  489. type cFiler struct {
  490. m *Model
  491. r string
  492. }
  493. // Implements scanner.CurrentFiler
  494. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  495. return cf.m.CurrentRepoFile(cf.r, file)
  496. }
  497. // ConnectedTo returns true if we are connected to the named node.
  498. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  499. m.pmut.RLock()
  500. _, ok := m.protoConn[nodeID]
  501. if ok {
  502. if statRef, ok := m.nodeStatRefs[nodeID]; ok {
  503. statRef.WasSeen()
  504. }
  505. }
  506. m.pmut.RUnlock()
  507. return ok
  508. }
  509. // AddConnection adds a new peer connection to the model. An initial index will
  510. // be sent to the connected peer, thereafter index updates whenever the local
  511. // repository changes.
  512. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  513. nodeID := protoConn.ID()
  514. m.pmut.Lock()
  515. if _, ok := m.protoConn[nodeID]; ok {
  516. panic("add existing node")
  517. }
  518. m.protoConn[nodeID] = protoConn
  519. if _, ok := m.rawConn[nodeID]; ok {
  520. panic("add existing node")
  521. }
  522. m.rawConn[nodeID] = rawConn
  523. cm := m.clusterConfig(nodeID)
  524. protoConn.ClusterConfig(cm)
  525. m.rmut.RLock()
  526. for _, repo := range m.nodeRepos[nodeID] {
  527. fs := m.repoFiles[repo]
  528. go sendIndexes(protoConn, repo, fs, m.repoIgnores[repo])
  529. }
  530. if statRef, ok := m.nodeStatRefs[nodeID]; ok {
  531. statRef.WasSeen()
  532. } else {
  533. l.Warnf("AddConnection for unconfigured node %v?", nodeID)
  534. }
  535. m.rmut.RUnlock()
  536. m.pmut.Unlock()
  537. }
  538. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) {
  539. nodeID := conn.ID()
  540. name := conn.Name()
  541. var err error
  542. if debug {
  543. l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
  544. }
  545. defer func() {
  546. if debug {
  547. l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
  548. }
  549. }()
  550. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs, ignores)
  551. for err == nil {
  552. time.Sleep(5 * time.Second)
  553. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  554. continue
  555. }
  556. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs, ignores)
  557. }
  558. }
  559. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) (uint64, error) {
  560. nodeID := conn.ID()
  561. name := conn.Name()
  562. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  563. currentBatchSize := 0
  564. maxLocalVer := uint64(0)
  565. var err error
  566. fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  567. f := fi.(protocol.FileInfo)
  568. if f.LocalVersion <= minLocalVer {
  569. return true
  570. }
  571. if f.LocalVersion > maxLocalVer {
  572. maxLocalVer = f.LocalVersion
  573. }
  574. if ignores.Match(f.Name) {
  575. return true
  576. }
  577. if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
  578. if initial {
  579. if err = conn.Index(repo, batch); err != nil {
  580. return false
  581. }
  582. if debug {
  583. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", nodeID, name, repo, len(batch), currentBatchSize)
  584. }
  585. initial = false
  586. } else {
  587. if err = conn.IndexUpdate(repo, batch); err != nil {
  588. return false
  589. }
  590. if debug {
  591. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", nodeID, name, repo, len(batch), currentBatchSize)
  592. }
  593. }
  594. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  595. currentBatchSize = 0
  596. }
  597. batch = append(batch, f)
  598. currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
  599. return true
  600. })
  601. if initial && err == nil {
  602. err = conn.Index(repo, batch)
  603. if debug && err == nil {
  604. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  605. }
  606. } else if len(batch) > 0 && err == nil {
  607. err = conn.IndexUpdate(repo, batch)
  608. if debug && err == nil {
  609. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  610. }
  611. }
  612. return maxLocalVer, err
  613. }
  614. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  615. f.LocalVersion = 0
  616. m.rmut.RLock()
  617. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  618. m.rmut.RUnlock()
  619. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  620. "repo": repo,
  621. "name": f.Name,
  622. "modified": time.Unix(f.Modified, 0),
  623. "flags": fmt.Sprintf("0%o", f.Flags),
  624. "size": f.Size(),
  625. })
  626. }
  627. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  628. m.pmut.RLock()
  629. nc, ok := m.protoConn[nodeID]
  630. m.pmut.RUnlock()
  631. if !ok {
  632. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  633. }
  634. if debug {
  635. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  636. }
  637. return nc.Request(repo, name, offset, size)
  638. }
  639. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  640. if m.started {
  641. panic("cannot add repo to started model")
  642. }
  643. if len(cfg.ID) == 0 {
  644. panic("cannot add empty repo id")
  645. }
  646. m.rmut.Lock()
  647. m.repoCfgs[cfg.ID] = cfg
  648. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  649. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  650. for i, node := range cfg.Nodes {
  651. m.repoNodes[cfg.ID][i] = node.NodeID
  652. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  653. }
  654. m.addedRepo = true
  655. m.rmut.Unlock()
  656. }
  657. func (m *Model) ScanRepos() {
  658. m.rmut.RLock()
  659. var repos = make([]string, 0, len(m.repoCfgs))
  660. for repo := range m.repoCfgs {
  661. repos = append(repos, repo)
  662. }
  663. m.rmut.RUnlock()
  664. var wg sync.WaitGroup
  665. wg.Add(len(repos))
  666. for _, repo := range repos {
  667. repo := repo
  668. go func() {
  669. err := m.ScanRepo(repo)
  670. if err != nil {
  671. invalidateRepo(m.cfg, repo, err)
  672. }
  673. wg.Done()
  674. }()
  675. }
  676. wg.Wait()
  677. }
  678. func (m *Model) CleanRepos() {
  679. m.rmut.RLock()
  680. var dirs = make([]string, 0, len(m.repoCfgs))
  681. for _, cfg := range m.repoCfgs {
  682. dirs = append(dirs, cfg.Directory)
  683. }
  684. m.rmut.RUnlock()
  685. var wg sync.WaitGroup
  686. wg.Add(len(dirs))
  687. for _, dir := range dirs {
  688. w := &scanner.Walker{
  689. Dir: dir,
  690. TempNamer: defTempNamer,
  691. }
  692. go func() {
  693. w.CleanTempFiles()
  694. wg.Done()
  695. }()
  696. }
  697. wg.Wait()
  698. }
  699. func (m *Model) ScanRepo(repo string) error {
  700. return m.ScanRepoSub(repo, "")
  701. }
  702. func (m *Model) ScanRepoSub(repo, sub string) error {
  703. if p := filepath.Clean(filepath.Join(repo, sub)); !strings.HasPrefix(p, repo) {
  704. return errors.New("invalid subpath")
  705. }
  706. m.rmut.RLock()
  707. fs, ok := m.repoFiles[repo]
  708. dir := m.repoCfgs[repo].Directory
  709. ignores, _ := ignore.Load(filepath.Join(dir, ".stignore"))
  710. m.repoIgnores[repo] = ignores
  711. w := &scanner.Walker{
  712. Dir: dir,
  713. Sub: sub,
  714. Ignores: ignores,
  715. BlockSize: scanner.StandardBlockSize,
  716. TempNamer: defTempNamer,
  717. CurrentFiler: cFiler{m, repo},
  718. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  719. }
  720. m.rmut.RUnlock()
  721. if !ok {
  722. return errors.New("no such repo")
  723. }
  724. m.setState(repo, RepoScanning)
  725. fchan, err := w.Walk()
  726. if err != nil {
  727. return err
  728. }
  729. batchSize := 100
  730. batch := make([]protocol.FileInfo, 0, 00)
  731. for f := range fchan {
  732. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  733. "repo": repo,
  734. "name": f.Name,
  735. "modified": time.Unix(f.Modified, 0),
  736. "flags": fmt.Sprintf("0%o", f.Flags),
  737. "size": f.Size(),
  738. })
  739. if len(batch) == batchSize {
  740. fs.Update(protocol.LocalNodeID, batch)
  741. batch = batch[:0]
  742. }
  743. batch = append(batch, f)
  744. }
  745. if len(batch) > 0 {
  746. fs.Update(protocol.LocalNodeID, batch)
  747. }
  748. batch = batch[:0]
  749. // TODO: We should limit the Have scanning to start at sub
  750. seenPrefix := false
  751. fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  752. f := fi.(protocol.FileInfoTruncated)
  753. if !strings.HasPrefix(f.Name, sub) {
  754. // Return true so that we keep iterating, until we get to the part
  755. // of the tree we are interested in. Then return false so we stop
  756. // iterating when we've passed the end of the subtree.
  757. return !seenPrefix
  758. }
  759. seenPrefix = true
  760. if !protocol.IsDeleted(f.Flags) {
  761. if f.IsInvalid() {
  762. return true
  763. }
  764. if len(batch) == batchSize {
  765. fs.Update(protocol.LocalNodeID, batch)
  766. batch = batch[:0]
  767. }
  768. if ignores.Match(f.Name) {
  769. // File has been ignored. Set invalid bit.
  770. nf := protocol.FileInfo{
  771. Name: f.Name,
  772. Flags: f.Flags | protocol.FlagInvalid,
  773. Modified: f.Modified,
  774. Version: f.Version, // The file is still the same, so don't bump version
  775. }
  776. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  777. "repo": repo,
  778. "name": f.Name,
  779. "modified": time.Unix(f.Modified, 0),
  780. "flags": fmt.Sprintf("0%o", f.Flags),
  781. "size": f.Size(),
  782. })
  783. batch = append(batch, nf)
  784. } else if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  785. // File has been deleted
  786. nf := protocol.FileInfo{
  787. Name: f.Name,
  788. Flags: f.Flags | protocol.FlagDeleted,
  789. Modified: f.Modified,
  790. Version: lamport.Default.Tick(f.Version),
  791. }
  792. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  793. "repo": repo,
  794. "name": f.Name,
  795. "modified": time.Unix(f.Modified, 0),
  796. "flags": fmt.Sprintf("0%o", f.Flags),
  797. "size": f.Size(),
  798. })
  799. batch = append(batch, nf)
  800. }
  801. }
  802. return true
  803. })
  804. if len(batch) > 0 {
  805. fs.Update(protocol.LocalNodeID, batch)
  806. }
  807. m.setState(repo, RepoIdle)
  808. return nil
  809. }
  810. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  811. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  812. cm := protocol.ClusterConfigMessage{
  813. ClientName: m.clientName,
  814. ClientVersion: m.clientVersion,
  815. Options: []protocol.Option{
  816. {
  817. Key: "name",
  818. Value: m.nodeName,
  819. },
  820. },
  821. }
  822. m.rmut.RLock()
  823. for _, repo := range m.nodeRepos[node] {
  824. cr := protocol.Repository{
  825. ID: repo,
  826. }
  827. for _, node := range m.repoNodes[repo] {
  828. // TODO: Set read only bit when relevant
  829. cr.Nodes = append(cr.Nodes, protocol.Node{
  830. ID: node[:],
  831. Flags: protocol.FlagShareTrusted,
  832. })
  833. }
  834. cm.Repositories = append(cm.Repositories, cr)
  835. }
  836. m.rmut.RUnlock()
  837. return cm
  838. }
  839. func (m *Model) setState(repo string, state repoState) {
  840. m.smut.Lock()
  841. oldState := m.repoState[repo]
  842. changed, ok := m.repoStateChanged[repo]
  843. if state != oldState {
  844. m.repoState[repo] = state
  845. m.repoStateChanged[repo] = time.Now()
  846. eventData := map[string]interface{}{
  847. "repo": repo,
  848. "to": state.String(),
  849. }
  850. if ok {
  851. eventData["duration"] = time.Since(changed).Seconds()
  852. eventData["from"] = oldState.String()
  853. }
  854. events.Default.Log(events.StateChanged, eventData)
  855. }
  856. m.smut.Unlock()
  857. }
  858. func (m *Model) State(repo string) (string, time.Time) {
  859. m.smut.RLock()
  860. state := m.repoState[repo]
  861. changed := m.repoStateChanged[repo]
  862. m.smut.RUnlock()
  863. return state.String(), changed
  864. }
  865. func (m *Model) Override(repo string) {
  866. m.rmut.RLock()
  867. fs := m.repoFiles[repo]
  868. m.rmut.RUnlock()
  869. m.setState(repo, RepoScanning)
  870. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  871. fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  872. need := fi.(protocol.FileInfo)
  873. if len(batch) == indexBatchSize {
  874. fs.Update(protocol.LocalNodeID, batch)
  875. batch = batch[:0]
  876. }
  877. have := fs.Get(protocol.LocalNodeID, need.Name)
  878. if have.Name != need.Name {
  879. // We are missing the file
  880. need.Flags |= protocol.FlagDeleted
  881. need.Blocks = nil
  882. } else {
  883. // We have the file, replace with our version
  884. need = have
  885. }
  886. need.Version = lamport.Default.Tick(need.Version)
  887. need.LocalVersion = 0
  888. batch = append(batch, need)
  889. return true
  890. })
  891. if len(batch) > 0 {
  892. fs.Update(protocol.LocalNodeID, batch)
  893. }
  894. m.setState(repo, RepoIdle)
  895. }
  896. // Version returns the change version for the given repository. This is
  897. // guaranteed to increment if the contents of the local or global repository
  898. // has changed.
  899. func (m *Model) LocalVersion(repo string) uint64 {
  900. m.rmut.Lock()
  901. defer m.rmut.Unlock()
  902. fs, ok := m.repoFiles[repo]
  903. if !ok {
  904. return 0
  905. }
  906. ver := fs.LocalVersion(protocol.LocalNodeID)
  907. for _, n := range m.repoNodes[repo] {
  908. ver += fs.LocalVersion(n)
  909. }
  910. return ver
  911. }