model.go 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "bufio"
  7. "crypto/tls"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "net"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/syncthing/syncthing/internal/config"
  20. "github.com/syncthing/syncthing/internal/events"
  21. "github.com/syncthing/syncthing/internal/files"
  22. "github.com/syncthing/syncthing/internal/ignore"
  23. "github.com/syncthing/syncthing/internal/lamport"
  24. "github.com/syncthing/syncthing/internal/osutil"
  25. "github.com/syncthing/syncthing/internal/protocol"
  26. "github.com/syncthing/syncthing/internal/scanner"
  27. "github.com/syncthing/syncthing/internal/stats"
  28. "github.com/syndtr/goleveldb/leveldb"
  29. )
  30. type repoState int
  31. const (
  32. RepoIdle repoState = iota
  33. RepoScanning
  34. RepoSyncing
  35. RepoCleaning
  36. )
  37. func (s repoState) String() string {
  38. switch s {
  39. case RepoIdle:
  40. return "idle"
  41. case RepoScanning:
  42. return "scanning"
  43. case RepoCleaning:
  44. return "cleaning"
  45. case RepoSyncing:
  46. return "syncing"
  47. default:
  48. return "unknown"
  49. }
  50. }
  51. // How many files to send in each Index/IndexUpdate message.
  52. const (
  53. indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
  54. indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
  55. IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
  56. indexBatchSize = 1000 // Either way, don't include more files than this
  57. )
  58. type Model struct {
  59. indexDir string
  60. cfg *config.Configuration
  61. db *leveldb.DB
  62. nodeName string
  63. clientName string
  64. clientVersion string
  65. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  66. repoFiles map[string]*files.Set // repo -> files
  67. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  68. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  69. nodeStatRefs map[protocol.NodeID]*stats.NodeStatisticsReference // nodeID -> statsRef
  70. repoIgnores map[string]ignore.Patterns // repo -> list of ignore patterns
  71. rmut sync.RWMutex // protects the above
  72. repoState map[string]repoState // repo -> state
  73. repoStateChanged map[string]time.Time // repo -> time when state changed
  74. smut sync.RWMutex
  75. protoConn map[protocol.NodeID]protocol.Connection
  76. rawConn map[protocol.NodeID]io.Closer
  77. nodeVer map[protocol.NodeID]string
  78. pmut sync.RWMutex // protects protoConn and rawConn
  79. addedRepo bool
  80. started bool
  81. }
  82. var (
  83. ErrNoSuchFile = errors.New("no such file")
  84. ErrInvalid = errors.New("file is invalid")
  85. )
  86. // NewModel creates and starts a new model. The model starts in read-only mode,
  87. // where it sends index information to connected peers and responds to requests
  88. // for file data without altering the local repository in any way.
  89. func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, clientVersion string, db *leveldb.DB) *Model {
  90. m := &Model{
  91. indexDir: indexDir,
  92. cfg: cfg,
  93. db: db,
  94. nodeName: nodeName,
  95. clientName: clientName,
  96. clientVersion: clientVersion,
  97. repoCfgs: make(map[string]config.RepositoryConfiguration),
  98. repoFiles: make(map[string]*files.Set),
  99. repoNodes: make(map[string][]protocol.NodeID),
  100. nodeRepos: make(map[protocol.NodeID][]string),
  101. nodeStatRefs: make(map[protocol.NodeID]*stats.NodeStatisticsReference),
  102. repoIgnores: make(map[string]ignore.Patterns),
  103. repoState: make(map[string]repoState),
  104. repoStateChanged: make(map[string]time.Time),
  105. protoConn: make(map[protocol.NodeID]protocol.Connection),
  106. rawConn: make(map[protocol.NodeID]io.Closer),
  107. nodeVer: make(map[protocol.NodeID]string),
  108. }
  109. var timeout = 20 * 60 // seconds
  110. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  111. it, err := strconv.Atoi(t)
  112. if err == nil {
  113. timeout = it
  114. }
  115. }
  116. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  117. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  118. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  119. return m
  120. }
  121. // StartRW starts read/write processing on the current model. When in
  122. // read/write mode the model will attempt to keep in sync with the cluster by
  123. // pulling needed files from peer nodes.
  124. func (m *Model) StartRepoRW(repo string, threads int) {
  125. m.rmut.RLock()
  126. defer m.rmut.RUnlock()
  127. if cfg, ok := m.repoCfgs[repo]; !ok {
  128. panic("cannot start without repo")
  129. } else {
  130. newPuller(cfg, m, threads, m.cfg)
  131. }
  132. }
  133. // StartRO starts read only processing on the current model. When in
  134. // read only mode the model will announce files to the cluster but not
  135. // pull in any external changes.
  136. func (m *Model) StartRepoRO(repo string) {
  137. m.StartRepoRW(repo, 0) // zero threads => read only
  138. }
  139. type ConnectionInfo struct {
  140. protocol.Statistics
  141. Address string
  142. ClientVersion string
  143. }
  144. // ConnectionStats returns a map with connection statistics for each connected node.
  145. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  146. type remoteAddrer interface {
  147. RemoteAddr() net.Addr
  148. }
  149. m.pmut.RLock()
  150. m.rmut.RLock()
  151. var res = make(map[string]ConnectionInfo)
  152. for node, conn := range m.protoConn {
  153. ci := ConnectionInfo{
  154. Statistics: conn.Statistics(),
  155. ClientVersion: m.nodeVer[node],
  156. }
  157. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  158. ci.Address = nc.RemoteAddr().String()
  159. }
  160. res[node.String()] = ci
  161. }
  162. m.rmut.RUnlock()
  163. m.pmut.RUnlock()
  164. in, out := protocol.TotalInOut()
  165. res["total"] = ConnectionInfo{
  166. Statistics: protocol.Statistics{
  167. At: time.Now(),
  168. InBytesTotal: in,
  169. OutBytesTotal: out,
  170. },
  171. }
  172. return res
  173. }
  174. // Returns statistics about each node
  175. func (m *Model) NodeStatistics() map[string]stats.NodeStatistics {
  176. var res = make(map[string]stats.NodeStatistics)
  177. for _, node := range m.cfg.Nodes {
  178. res[node.NodeID.String()] = m.nodeStatRef(node.NodeID).GetStatistics()
  179. }
  180. return res
  181. }
  182. // Returns the completion status, in percent, for the given node and repo.
  183. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  184. var tot int64
  185. m.rmut.RLock()
  186. rf, ok := m.repoFiles[repo]
  187. m.rmut.RUnlock()
  188. if !ok {
  189. return 0 // Repo doesn't exist, so we hardly have any of it
  190. }
  191. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  192. if !f.IsDeleted() {
  193. tot += f.Size()
  194. }
  195. return true
  196. })
  197. if tot == 0 {
  198. return 100 // Repo is empty, so we have all of it
  199. }
  200. var need int64
  201. rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
  202. if !f.IsDeleted() {
  203. need += f.Size()
  204. }
  205. return true
  206. })
  207. res := 100 * (1 - float64(need)/float64(tot))
  208. if debug {
  209. l.Debugf("Completion(%s, %q): %f (%d / %d)", node, repo, res, need, tot)
  210. }
  211. return res
  212. }
  213. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  214. for _, f := range fs {
  215. fs, de, by := sizeOfFile(f)
  216. files += fs
  217. deleted += de
  218. bytes += by
  219. }
  220. return
  221. }
  222. func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
  223. if !f.IsDeleted() {
  224. files++
  225. } else {
  226. deleted++
  227. }
  228. bytes += f.Size()
  229. return
  230. }
  231. // GlobalSize returns the number of files, deleted files and total bytes for all
  232. // files in the global model.
  233. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  234. m.rmut.RLock()
  235. defer m.rmut.RUnlock()
  236. if rf, ok := m.repoFiles[repo]; ok {
  237. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  238. fs, de, by := sizeOfFile(f)
  239. files += fs
  240. deleted += de
  241. bytes += by
  242. return true
  243. })
  244. }
  245. return
  246. }
  247. // LocalSize returns the number of files, deleted files and total bytes for all
  248. // files in the local repository.
  249. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  250. m.rmut.RLock()
  251. defer m.rmut.RUnlock()
  252. if rf, ok := m.repoFiles[repo]; ok {
  253. rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  254. if f.IsInvalid() {
  255. return true
  256. }
  257. fs, de, by := sizeOfFile(f)
  258. files += fs
  259. deleted += de
  260. bytes += by
  261. return true
  262. })
  263. }
  264. return
  265. }
  266. // NeedSize returns the number and total size of currently needed files.
  267. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  268. m.rmut.RLock()
  269. defer m.rmut.RUnlock()
  270. if rf, ok := m.repoFiles[repo]; ok {
  271. rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  272. fs, de, by := sizeOfFile(f)
  273. files += fs + de
  274. bytes += by
  275. return true
  276. })
  277. }
  278. if debug {
  279. l.Debugf("NeedSize(%q): %d %d", repo, files, bytes)
  280. }
  281. return
  282. }
  283. // NeedFiles returns the list of currently needed files, stopping at maxFiles
  284. // files or maxBlocks blocks. Limits <= 0 are ignored.
  285. func (m *Model) NeedFilesRepoLimited(repo string, maxFiles, maxBlocks int) []protocol.FileInfo {
  286. m.rmut.RLock()
  287. defer m.rmut.RUnlock()
  288. nblocks := 0
  289. if rf, ok := m.repoFiles[repo]; ok {
  290. fs := make([]protocol.FileInfo, 0, maxFiles)
  291. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  292. fi := f.(protocol.FileInfo)
  293. fs = append(fs, fi)
  294. nblocks += len(fi.Blocks)
  295. return (maxFiles <= 0 || len(fs) < maxFiles) && (maxBlocks <= 0 || nblocks < maxBlocks)
  296. })
  297. return fs
  298. }
  299. return nil
  300. }
  301. // Index is called when a new node is connected and we receive their full index.
  302. // Implements the protocol.Model interface.
  303. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  304. if debug {
  305. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  306. }
  307. if !m.repoSharedWith(repo, nodeID) {
  308. events.Default.Log(events.RepoRejected, map[string]string{
  309. "repo": repo,
  310. "node": nodeID.String(),
  311. })
  312. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  313. return
  314. }
  315. m.rmut.RLock()
  316. files, ok := m.repoFiles[repo]
  317. ignores, _ := m.repoIgnores[repo]
  318. m.rmut.RUnlock()
  319. if !ok {
  320. l.Fatalf("Index for nonexistant repo %q", repo)
  321. }
  322. for i := 0; i < len(fs); {
  323. lamport.Default.Tick(fs[i].Version)
  324. if ignores.Match(fs[i].Name) {
  325. fs[i] = fs[len(fs)-1]
  326. fs = fs[:len(fs)-1]
  327. } else {
  328. i++
  329. }
  330. }
  331. files.Replace(nodeID, fs)
  332. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  333. "node": nodeID.String(),
  334. "repo": repo,
  335. "items": len(fs),
  336. "version": files.LocalVersion(nodeID),
  337. })
  338. }
  339. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  340. // Implements the protocol.Model interface.
  341. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  342. if debug {
  343. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  344. }
  345. if !m.repoSharedWith(repo, nodeID) {
  346. l.Infof("Update for unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  347. return
  348. }
  349. m.rmut.RLock()
  350. files, ok := m.repoFiles[repo]
  351. ignores, _ := m.repoIgnores[repo]
  352. m.rmut.RUnlock()
  353. if !ok {
  354. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  355. }
  356. for i := 0; i < len(fs); {
  357. lamport.Default.Tick(fs[i].Version)
  358. if ignores.Match(fs[i].Name) {
  359. fs[i] = fs[len(fs)-1]
  360. fs = fs[:len(fs)-1]
  361. } else {
  362. i++
  363. }
  364. }
  365. files.Update(nodeID, fs)
  366. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  367. "node": nodeID.String(),
  368. "repo": repo,
  369. "items": len(fs),
  370. "version": files.LocalVersion(nodeID),
  371. })
  372. }
  373. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  374. m.rmut.RLock()
  375. defer m.rmut.RUnlock()
  376. for _, nrepo := range m.nodeRepos[nodeID] {
  377. if nrepo == repo {
  378. return true
  379. }
  380. }
  381. return false
  382. }
  383. func (m *Model) ClusterConfig(nodeID protocol.NodeID, cm protocol.ClusterConfigMessage) {
  384. m.pmut.Lock()
  385. if cm.ClientName == "syncthing" {
  386. m.nodeVer[nodeID] = cm.ClientVersion
  387. } else {
  388. m.nodeVer[nodeID] = cm.ClientName + " " + cm.ClientVersion
  389. }
  390. m.pmut.Unlock()
  391. l.Infof(`Node %s client is "%s %s"`, nodeID, cm.ClientName, cm.ClientVersion)
  392. if name := cm.GetOption("name"); name != "" {
  393. l.Infof("Node %s name is %q", nodeID, name)
  394. node := m.cfg.GetNodeConfiguration(nodeID)
  395. if node != nil && node.Name == "" {
  396. node.Name = name
  397. m.cfg.Save()
  398. }
  399. }
  400. if m.cfg.GetNodeConfiguration(nodeID).Introducer {
  401. // This node is an introducer. Go through the announced lists of repos
  402. // and nodes and add what we are missing.
  403. var changed bool
  404. for _, repo := range cm.Repositories {
  405. // If we don't have this repository yet, skip it. Ideally, we'd
  406. // offer up something in the GUI to create the repo, but for the
  407. // moment we only handle repos that we already have.
  408. if _, ok := m.repoNodes[repo.ID]; !ok {
  409. continue
  410. }
  411. nextNode:
  412. for _, node := range repo.Nodes {
  413. var id protocol.NodeID
  414. copy(id[:], node.ID)
  415. if m.cfg.GetNodeConfiguration(id)==nil {
  416. // The node is currently unknown. Add it to the config.
  417. l.Infof("Adding node %v to config (vouched for by introducer %v)", id, nodeID)
  418. newNodeCfg := config.NodeConfiguration{
  419. NodeID: id,
  420. }
  421. // The introducers' introducers are also our introducers.
  422. if node.Flags&protocol.FlagIntroducer != 0 {
  423. l.Infof("Node %v is now also an introducer", id)
  424. newNodeCfg.Introducer = true
  425. }
  426. m.cfg.Nodes = append(m.cfg.Nodes, newNodeCfg)
  427. changed = true
  428. }
  429. for _, er := range m.nodeRepos[id] {
  430. if er == repo.ID {
  431. // We already share the repo with this node, so
  432. // nothing to do.
  433. continue nextNode
  434. }
  435. }
  436. // We don't yet share this repo with this node. Add the node
  437. // to sharing list of the repo.
  438. l.Infof("Adding node %v to share %q (vouched for by introducer %v)", id, repo.ID, nodeID)
  439. m.nodeRepos[id] = append(m.nodeRepos[id], repo.ID)
  440. m.repoNodes[repo.ID] = append(m.repoNodes[repo.ID], id)
  441. repoCfg := m.cfg.GetRepoConfiguration(repo.ID)
  442. repoCfg.Nodes = append(repoCfg.Nodes, config.RepositoryNodeConfiguration{
  443. NodeID: id,
  444. })
  445. changed = true
  446. }
  447. }
  448. if changed {
  449. m.cfg.Save()
  450. }
  451. }
  452. }
  453. // Close removes the peer from the model and closes the underlying connection if possible.
  454. // Implements the protocol.Model interface.
  455. func (m *Model) Close(node protocol.NodeID, err error) {
  456. l.Infof("Connection to %s closed: %v", node, err)
  457. events.Default.Log(events.NodeDisconnected, map[string]string{
  458. "id": node.String(),
  459. "error": err.Error(),
  460. })
  461. m.pmut.Lock()
  462. m.rmut.RLock()
  463. for _, repo := range m.nodeRepos[node] {
  464. m.repoFiles[repo].Replace(node, nil)
  465. }
  466. m.rmut.RUnlock()
  467. conn, ok := m.rawConn[node]
  468. if ok {
  469. if conn, ok := conn.(*tls.Conn); ok {
  470. // If the underlying connection is a *tls.Conn, Close() does more
  471. // than it says on the tin. Specifically, it sends a TLS alert
  472. // message, which might block forever if the connection is dead
  473. // and we don't have a deadline site.
  474. conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond))
  475. }
  476. conn.Close()
  477. }
  478. delete(m.protoConn, node)
  479. delete(m.rawConn, node)
  480. delete(m.nodeVer, node)
  481. m.pmut.Unlock()
  482. }
  483. // Request returns the specified data segment by reading it from local disk.
  484. // Implements the protocol.Model interface.
  485. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  486. // Verify that the requested file exists in the local model.
  487. m.rmut.RLock()
  488. r, ok := m.repoFiles[repo]
  489. m.rmut.RUnlock()
  490. if !ok {
  491. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  492. return nil, ErrNoSuchFile
  493. }
  494. lf := r.Get(protocol.LocalNodeID, name)
  495. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  496. if debug {
  497. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  498. }
  499. return nil, ErrInvalid
  500. }
  501. if offset > lf.Size() {
  502. if debug {
  503. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  504. }
  505. return nil, ErrNoSuchFile
  506. }
  507. if debug && nodeID != protocol.LocalNodeID {
  508. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  509. }
  510. m.rmut.RLock()
  511. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  512. m.rmut.RUnlock()
  513. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  514. if err != nil {
  515. return nil, err
  516. }
  517. defer fd.Close()
  518. buf := make([]byte, size)
  519. _, err = fd.ReadAt(buf, offset)
  520. if err != nil {
  521. return nil, err
  522. }
  523. return buf, nil
  524. }
  525. // ReplaceLocal replaces the local repository index with the given list of files.
  526. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  527. m.rmut.RLock()
  528. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  529. m.rmut.RUnlock()
  530. }
  531. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  532. m.rmut.RLock()
  533. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  534. m.rmut.RUnlock()
  535. return f
  536. }
  537. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  538. m.rmut.RLock()
  539. f := m.repoFiles[repo].GetGlobal(file)
  540. m.rmut.RUnlock()
  541. return f
  542. }
  543. type cFiler struct {
  544. m *Model
  545. r string
  546. }
  547. // Implements scanner.CurrentFiler
  548. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  549. return cf.m.CurrentRepoFile(cf.r, file)
  550. }
  551. // ConnectedTo returns true if we are connected to the named node.
  552. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  553. m.pmut.RLock()
  554. _, ok := m.protoConn[nodeID]
  555. m.pmut.RUnlock()
  556. if ok {
  557. m.nodeWasSeen(nodeID)
  558. }
  559. return ok
  560. }
  561. func (m *Model) GetIgnores(repo string) ([]string, error) {
  562. var lines []string
  563. cfg, ok := m.repoCfgs[repo]
  564. if !ok {
  565. return lines, fmt.Errorf("Repo %s does not exist", repo)
  566. }
  567. m.rmut.Lock()
  568. defer m.rmut.Unlock()
  569. fd, err := os.Open(filepath.Join(cfg.Directory, ".stignore"))
  570. if err != nil {
  571. if os.IsNotExist(err) {
  572. return lines, nil
  573. }
  574. l.Warnln("Loading .stignore:", err)
  575. return lines, err
  576. }
  577. defer fd.Close()
  578. scanner := bufio.NewScanner(fd)
  579. for scanner.Scan() {
  580. lines = append(lines, strings.TrimSpace(scanner.Text()))
  581. }
  582. return lines, nil
  583. }
  584. func (m *Model) SetIgnores(repo string, content []string) error {
  585. cfg, ok := m.repoCfgs[repo]
  586. if !ok {
  587. return fmt.Errorf("Repo %s does not exist", repo)
  588. }
  589. fd, err := ioutil.TempFile(cfg.Directory, ".syncthing.stignore-"+repo)
  590. if err != nil {
  591. l.Warnln("Saving .stignore:", err)
  592. return err
  593. }
  594. defer os.Remove(fd.Name())
  595. for _, line := range content {
  596. _, err = fmt.Fprintln(fd, line)
  597. if err != nil {
  598. l.Warnln("Saving .stignore:", err)
  599. return err
  600. }
  601. }
  602. err = fd.Close()
  603. if err != nil {
  604. l.Warnln("Saving .stignore:", err)
  605. return err
  606. }
  607. file := filepath.Join(cfg.Directory, ".stignore")
  608. err = osutil.Rename(fd.Name(), file)
  609. if err != nil {
  610. l.Warnln("Saving .stignore:", err)
  611. return err
  612. }
  613. return m.ScanRepo(repo)
  614. }
  615. // AddConnection adds a new peer connection to the model. An initial index will
  616. // be sent to the connected peer, thereafter index updates whenever the local
  617. // repository changes.
  618. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  619. nodeID := protoConn.ID()
  620. m.pmut.Lock()
  621. if _, ok := m.protoConn[nodeID]; ok {
  622. panic("add existing node")
  623. }
  624. m.protoConn[nodeID] = protoConn
  625. if _, ok := m.rawConn[nodeID]; ok {
  626. panic("add existing node")
  627. }
  628. m.rawConn[nodeID] = rawConn
  629. cm := m.clusterConfig(nodeID)
  630. protoConn.ClusterConfig(cm)
  631. m.rmut.RLock()
  632. for _, repo := range m.nodeRepos[nodeID] {
  633. fs := m.repoFiles[repo]
  634. go sendIndexes(protoConn, repo, fs, m.repoIgnores[repo])
  635. }
  636. m.rmut.RUnlock()
  637. m.pmut.Unlock()
  638. m.nodeWasSeen(nodeID)
  639. }
  640. func (m *Model) nodeStatRef(nodeID protocol.NodeID) *stats.NodeStatisticsReference {
  641. m.rmut.Lock()
  642. defer m.rmut.Unlock()
  643. if sr, ok := m.nodeStatRefs[nodeID]; ok {
  644. return sr
  645. } else {
  646. sr = stats.NewNodeStatisticsReference(m.db, nodeID)
  647. m.nodeStatRefs[nodeID] = sr
  648. return sr
  649. }
  650. }
  651. func (m *Model) nodeWasSeen(nodeID protocol.NodeID) {
  652. m.nodeStatRef(nodeID).WasSeen()
  653. }
  654. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) {
  655. nodeID := conn.ID()
  656. name := conn.Name()
  657. var err error
  658. if debug {
  659. l.Debugf("sendIndexes for %s-%s@/%q starting", nodeID, name, repo)
  660. }
  661. defer func() {
  662. if debug {
  663. l.Debugf("sendIndexes for %s-%s@/%q exiting: %v", nodeID, name, repo, err)
  664. }
  665. }()
  666. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs, ignores)
  667. for err == nil {
  668. time.Sleep(5 * time.Second)
  669. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  670. continue
  671. }
  672. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs, ignores)
  673. }
  674. }
  675. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) (uint64, error) {
  676. nodeID := conn.ID()
  677. name := conn.Name()
  678. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  679. currentBatchSize := 0
  680. maxLocalVer := uint64(0)
  681. var err error
  682. fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  683. f := fi.(protocol.FileInfo)
  684. if f.LocalVersion <= minLocalVer {
  685. return true
  686. }
  687. if f.LocalVersion > maxLocalVer {
  688. maxLocalVer = f.LocalVersion
  689. }
  690. if ignores.Match(f.Name) {
  691. return true
  692. }
  693. if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
  694. if initial {
  695. if err = conn.Index(repo, batch); err != nil {
  696. return false
  697. }
  698. if debug {
  699. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", nodeID, name, repo, len(batch), currentBatchSize)
  700. }
  701. initial = false
  702. } else {
  703. if err = conn.IndexUpdate(repo, batch); err != nil {
  704. return false
  705. }
  706. if debug {
  707. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", nodeID, name, repo, len(batch), currentBatchSize)
  708. }
  709. }
  710. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  711. currentBatchSize = 0
  712. }
  713. batch = append(batch, f)
  714. currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
  715. return true
  716. })
  717. if initial && err == nil {
  718. err = conn.Index(repo, batch)
  719. if debug && err == nil {
  720. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  721. }
  722. } else if len(batch) > 0 && err == nil {
  723. err = conn.IndexUpdate(repo, batch)
  724. if debug && err == nil {
  725. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  726. }
  727. }
  728. return maxLocalVer, err
  729. }
  730. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  731. f.LocalVersion = 0
  732. m.rmut.RLock()
  733. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  734. m.rmut.RUnlock()
  735. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  736. "repo": repo,
  737. "name": f.Name,
  738. "modified": time.Unix(f.Modified, 0),
  739. "flags": fmt.Sprintf("0%o", f.Flags),
  740. "size": f.Size(),
  741. })
  742. }
  743. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  744. m.pmut.RLock()
  745. nc, ok := m.protoConn[nodeID]
  746. m.pmut.RUnlock()
  747. if !ok {
  748. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  749. }
  750. if debug {
  751. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  752. }
  753. return nc.Request(repo, name, offset, size)
  754. }
  755. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  756. if m.started {
  757. panic("cannot add repo to started model")
  758. }
  759. if len(cfg.ID) == 0 {
  760. panic("cannot add empty repo id")
  761. }
  762. m.rmut.Lock()
  763. m.repoCfgs[cfg.ID] = cfg
  764. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  765. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  766. for i, node := range cfg.Nodes {
  767. m.repoNodes[cfg.ID][i] = node.NodeID
  768. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  769. }
  770. m.addedRepo = true
  771. m.rmut.Unlock()
  772. }
  773. func (m *Model) ScanRepos() {
  774. m.rmut.RLock()
  775. var repos = make([]string, 0, len(m.repoCfgs))
  776. for repo := range m.repoCfgs {
  777. repos = append(repos, repo)
  778. }
  779. m.rmut.RUnlock()
  780. var wg sync.WaitGroup
  781. wg.Add(len(repos))
  782. for _, repo := range repos {
  783. repo := repo
  784. go func() {
  785. err := m.ScanRepo(repo)
  786. if err != nil {
  787. invalidateRepo(m.cfg, repo, err)
  788. }
  789. wg.Done()
  790. }()
  791. }
  792. wg.Wait()
  793. }
  794. func (m *Model) CleanRepos() {
  795. m.rmut.RLock()
  796. var dirs = make([]string, 0, len(m.repoCfgs))
  797. for _, cfg := range m.repoCfgs {
  798. dirs = append(dirs, cfg.Directory)
  799. }
  800. m.rmut.RUnlock()
  801. var wg sync.WaitGroup
  802. wg.Add(len(dirs))
  803. for _, dir := range dirs {
  804. w := &scanner.Walker{
  805. Dir: dir,
  806. TempNamer: defTempNamer,
  807. }
  808. go func() {
  809. w.CleanTempFiles()
  810. wg.Done()
  811. }()
  812. }
  813. wg.Wait()
  814. }
  815. func (m *Model) ScanRepo(repo string) error {
  816. return m.ScanRepoSub(repo, "")
  817. }
  818. func (m *Model) ScanRepoSub(repo, sub string) error {
  819. if p := filepath.Clean(filepath.Join(repo, sub)); !strings.HasPrefix(p, repo) {
  820. return errors.New("invalid subpath")
  821. }
  822. m.rmut.RLock()
  823. fs, ok := m.repoFiles[repo]
  824. dir := m.repoCfgs[repo].Directory
  825. ignores, _ := ignore.Load(filepath.Join(dir, ".stignore"))
  826. m.repoIgnores[repo] = ignores
  827. w := &scanner.Walker{
  828. Dir: dir,
  829. Sub: sub,
  830. Ignores: ignores,
  831. BlockSize: scanner.StandardBlockSize,
  832. TempNamer: defTempNamer,
  833. CurrentFiler: cFiler{m, repo},
  834. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  835. }
  836. m.rmut.RUnlock()
  837. if !ok {
  838. return errors.New("no such repo")
  839. }
  840. m.setState(repo, RepoScanning)
  841. fchan, err := w.Walk()
  842. if err != nil {
  843. return err
  844. }
  845. batchSize := 100
  846. batch := make([]protocol.FileInfo, 0, 00)
  847. for f := range fchan {
  848. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  849. "repo": repo,
  850. "name": f.Name,
  851. "modified": time.Unix(f.Modified, 0),
  852. "flags": fmt.Sprintf("0%o", f.Flags),
  853. "size": f.Size(),
  854. })
  855. if len(batch) == batchSize {
  856. fs.Update(protocol.LocalNodeID, batch)
  857. batch = batch[:0]
  858. }
  859. batch = append(batch, f)
  860. }
  861. if len(batch) > 0 {
  862. fs.Update(protocol.LocalNodeID, batch)
  863. }
  864. batch = batch[:0]
  865. // TODO: We should limit the Have scanning to start at sub
  866. seenPrefix := false
  867. fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  868. f := fi.(protocol.FileInfoTruncated)
  869. if !strings.HasPrefix(f.Name, sub) {
  870. // Return true so that we keep iterating, until we get to the part
  871. // of the tree we are interested in. Then return false so we stop
  872. // iterating when we've passed the end of the subtree.
  873. return !seenPrefix
  874. }
  875. seenPrefix = true
  876. if !protocol.IsDeleted(f.Flags) {
  877. if f.IsInvalid() {
  878. return true
  879. }
  880. if len(batch) == batchSize {
  881. fs.Update(protocol.LocalNodeID, batch)
  882. batch = batch[:0]
  883. }
  884. if ignores.Match(f.Name) {
  885. // File has been ignored. Set invalid bit.
  886. nf := protocol.FileInfo{
  887. Name: f.Name,
  888. Flags: f.Flags | protocol.FlagInvalid,
  889. Modified: f.Modified,
  890. Version: f.Version, // The file is still the same, so don't bump version
  891. }
  892. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  893. "repo": repo,
  894. "name": f.Name,
  895. "modified": time.Unix(f.Modified, 0),
  896. "flags": fmt.Sprintf("0%o", f.Flags),
  897. "size": f.Size(),
  898. })
  899. batch = append(batch, nf)
  900. } else if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  901. // File has been deleted
  902. nf := protocol.FileInfo{
  903. Name: f.Name,
  904. Flags: f.Flags | protocol.FlagDeleted,
  905. Modified: f.Modified,
  906. Version: lamport.Default.Tick(f.Version),
  907. }
  908. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  909. "repo": repo,
  910. "name": f.Name,
  911. "modified": time.Unix(f.Modified, 0),
  912. "flags": fmt.Sprintf("0%o", f.Flags),
  913. "size": f.Size(),
  914. })
  915. batch = append(batch, nf)
  916. }
  917. }
  918. return true
  919. })
  920. if len(batch) > 0 {
  921. fs.Update(protocol.LocalNodeID, batch)
  922. }
  923. m.setState(repo, RepoIdle)
  924. return nil
  925. }
  926. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  927. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  928. cm := protocol.ClusterConfigMessage{
  929. ClientName: m.clientName,
  930. ClientVersion: m.clientVersion,
  931. Options: []protocol.Option{
  932. {
  933. Key: "name",
  934. Value: m.nodeName,
  935. },
  936. },
  937. }
  938. m.rmut.RLock()
  939. for _, repo := range m.nodeRepos[node] {
  940. cr := protocol.Repository{
  941. ID: repo,
  942. }
  943. for _, node := range m.repoNodes[repo] {
  944. // NodeID is a value type, but with an underlying array. Copy it
  945. // so we don't grab aliases to the same array later on in node[:]
  946. node := node
  947. // TODO: Set read only bit when relevant
  948. cn := protocol.Node{
  949. ID: node[:],
  950. Flags: protocol.FlagShareTrusted,
  951. }
  952. if nodeCfg := m.cfg.GetNodeConfiguration(node); nodeCfg.Introducer {
  953. cn.Flags |= protocol.FlagIntroducer
  954. }
  955. cr.Nodes = append(cr.Nodes, cn)
  956. }
  957. cm.Repositories = append(cm.Repositories, cr)
  958. }
  959. m.rmut.RUnlock()
  960. return cm
  961. }
  962. func (m *Model) setState(repo string, state repoState) {
  963. m.smut.Lock()
  964. oldState := m.repoState[repo]
  965. changed, ok := m.repoStateChanged[repo]
  966. if state != oldState {
  967. m.repoState[repo] = state
  968. m.repoStateChanged[repo] = time.Now()
  969. eventData := map[string]interface{}{
  970. "repo": repo,
  971. "to": state.String(),
  972. }
  973. if ok {
  974. eventData["duration"] = time.Since(changed).Seconds()
  975. eventData["from"] = oldState.String()
  976. }
  977. events.Default.Log(events.StateChanged, eventData)
  978. }
  979. m.smut.Unlock()
  980. }
  981. func (m *Model) State(repo string) (string, time.Time) {
  982. m.smut.RLock()
  983. state := m.repoState[repo]
  984. changed := m.repoStateChanged[repo]
  985. m.smut.RUnlock()
  986. return state.String(), changed
  987. }
  988. func (m *Model) Override(repo string) {
  989. m.rmut.RLock()
  990. fs := m.repoFiles[repo]
  991. m.rmut.RUnlock()
  992. m.setState(repo, RepoScanning)
  993. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  994. fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  995. need := fi.(protocol.FileInfo)
  996. if len(batch) == indexBatchSize {
  997. fs.Update(protocol.LocalNodeID, batch)
  998. batch = batch[:0]
  999. }
  1000. have := fs.Get(protocol.LocalNodeID, need.Name)
  1001. if have.Name != need.Name {
  1002. // We are missing the file
  1003. need.Flags |= protocol.FlagDeleted
  1004. need.Blocks = nil
  1005. } else {
  1006. // We have the file, replace with our version
  1007. need = have
  1008. }
  1009. need.Version = lamport.Default.Tick(need.Version)
  1010. need.LocalVersion = 0
  1011. batch = append(batch, need)
  1012. return true
  1013. })
  1014. if len(batch) > 0 {
  1015. fs.Update(protocol.LocalNodeID, batch)
  1016. }
  1017. m.setState(repo, RepoIdle)
  1018. }
  1019. // Version returns the change version for the given repository. This is
  1020. // guaranteed to increment if the contents of the local or global repository
  1021. // has changed.
  1022. func (m *Model) LocalVersion(repo string) uint64 {
  1023. m.rmut.Lock()
  1024. defer m.rmut.Unlock()
  1025. fs, ok := m.repoFiles[repo]
  1026. if !ok {
  1027. panic("bug: LocalVersion called for nonexistent repo " + repo)
  1028. }
  1029. ver := fs.LocalVersion(protocol.LocalNodeID)
  1030. for _, n := range m.repoNodes[repo] {
  1031. ver += fs.LocalVersion(n)
  1032. }
  1033. return ver
  1034. }