model.go 32 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258
  1. // Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file).
  2. // All rights reserved. Use of this source code is governed by an MIT-style
  3. // license that can be found in the LICENSE file.
  4. package model
  5. import (
  6. "bufio"
  7. "crypto/tls"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "io/ioutil"
  12. "net"
  13. "os"
  14. "path/filepath"
  15. "strconv"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/syncthing/syncthing/internal/config"
  20. "github.com/syncthing/syncthing/internal/events"
  21. "github.com/syncthing/syncthing/internal/files"
  22. "github.com/syncthing/syncthing/internal/ignore"
  23. "github.com/syncthing/syncthing/internal/lamport"
  24. "github.com/syncthing/syncthing/internal/osutil"
  25. "github.com/syncthing/syncthing/internal/protocol"
  26. "github.com/syncthing/syncthing/internal/scanner"
  27. "github.com/syncthing/syncthing/internal/stats"
  28. "github.com/syncthing/syncthing/internal/versioner"
  29. "github.com/syndtr/goleveldb/leveldb"
  30. )
  31. type repoState int
  32. const (
  33. RepoIdle repoState = iota
  34. RepoScanning
  35. RepoSyncing
  36. RepoCleaning
  37. )
  38. func (s repoState) String() string {
  39. switch s {
  40. case RepoIdle:
  41. return "idle"
  42. case RepoScanning:
  43. return "scanning"
  44. case RepoCleaning:
  45. return "cleaning"
  46. case RepoSyncing:
  47. return "syncing"
  48. default:
  49. return "unknown"
  50. }
  51. }
  52. // How many files to send in each Index/IndexUpdate message.
  53. const (
  54. indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
  55. indexPerFileSize = 250 // Each FileInfo is approximately this big, in bytes, excluding BlockInfos
  56. IndexPerBlockSize = 40 // Each BlockInfo is approximately this big
  57. indexBatchSize = 1000 // Either way, don't include more files than this
  58. )
  59. type Model struct {
  60. indexDir string
  61. cfg *config.Configuration
  62. db *leveldb.DB
  63. nodeName string
  64. clientName string
  65. clientVersion string
  66. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  67. repoFiles map[string]*files.Set // repo -> files
  68. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  69. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  70. nodeStatRefs map[protocol.NodeID]*stats.NodeStatisticsReference // nodeID -> statsRef
  71. repoIgnores map[string]ignore.Patterns // repo -> list of ignore patterns
  72. rmut sync.RWMutex // protects the above
  73. repoState map[string]repoState // repo -> state
  74. repoStateChanged map[string]time.Time // repo -> time when state changed
  75. smut sync.RWMutex
  76. protoConn map[protocol.NodeID]protocol.Connection
  77. rawConn map[protocol.NodeID]io.Closer
  78. nodeVer map[protocol.NodeID]string
  79. pmut sync.RWMutex // protects protoConn and rawConn
  80. addedRepo bool
  81. started bool
  82. }
  83. var (
  84. ErrNoSuchFile = errors.New("no such file")
  85. ErrInvalid = errors.New("file is invalid")
  86. )
  87. // NewModel creates and starts a new model. The model starts in read-only mode,
  88. // where it sends index information to connected peers and responds to requests
  89. // for file data without altering the local repository in any way.
  90. func NewModel(indexDir string, cfg *config.Configuration, nodeName, clientName, clientVersion string, db *leveldb.DB) *Model {
  91. m := &Model{
  92. indexDir: indexDir,
  93. cfg: cfg,
  94. db: db,
  95. nodeName: nodeName,
  96. clientName: clientName,
  97. clientVersion: clientVersion,
  98. repoCfgs: make(map[string]config.RepositoryConfiguration),
  99. repoFiles: make(map[string]*files.Set),
  100. repoNodes: make(map[string][]protocol.NodeID),
  101. nodeRepos: make(map[protocol.NodeID][]string),
  102. nodeStatRefs: make(map[protocol.NodeID]*stats.NodeStatisticsReference),
  103. repoIgnores: make(map[string]ignore.Patterns),
  104. repoState: make(map[string]repoState),
  105. repoStateChanged: make(map[string]time.Time),
  106. protoConn: make(map[protocol.NodeID]protocol.Connection),
  107. rawConn: make(map[protocol.NodeID]io.Closer),
  108. nodeVer: make(map[protocol.NodeID]string),
  109. }
  110. var timeout = 20 * 60 // seconds
  111. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  112. it, err := strconv.Atoi(t)
  113. if err == nil {
  114. timeout = it
  115. }
  116. }
  117. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  118. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  119. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  120. return m
  121. }
  122. // StartRW starts read/write processing on the current model. When in
  123. // read/write mode the model will attempt to keep in sync with the cluster by
  124. // pulling needed files from peer nodes.
  125. func (m *Model) StartRepoRW(repo string) {
  126. m.rmut.Lock()
  127. cfg, ok := m.repoCfgs[repo]
  128. m.rmut.Unlock()
  129. if !ok {
  130. panic("cannot start nonexistent repo " + repo)
  131. }
  132. p := Puller{
  133. repo: repo,
  134. dir: cfg.Directory,
  135. scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
  136. model: m,
  137. }
  138. if len(cfg.Versioning.Type) > 0 {
  139. factory, ok := versioner.Factories[cfg.Versioning.Type]
  140. if !ok {
  141. l.Fatalf("Requested versioning type %q that does not exist", cfg.Versioning.Type)
  142. }
  143. p.versioner = factory(repo, cfg.Directory, cfg.Versioning.Params)
  144. }
  145. go p.Serve()
  146. }
  147. // StartRO starts read only processing on the current model. When in
  148. // read only mode the model will announce files to the cluster but not
  149. // pull in any external changes.
  150. func (m *Model) StartRepoRO(repo string) {
  151. intv := time.Duration(m.repoCfgs[repo].RescanIntervalS) * time.Second
  152. go func() {
  153. for {
  154. time.Sleep(intv)
  155. if debug {
  156. l.Debugln(m, "rescan", repo)
  157. }
  158. m.setState(repo, RepoScanning)
  159. if err := m.ScanRepo(repo); err != nil {
  160. invalidateRepo(m.cfg, repo, err)
  161. return
  162. }
  163. m.setState(repo, RepoIdle)
  164. }
  165. }()
  166. }
  167. type ConnectionInfo struct {
  168. protocol.Statistics
  169. Address string
  170. ClientVersion string
  171. }
  172. // ConnectionStats returns a map with connection statistics for each connected node.
  173. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  174. type remoteAddrer interface {
  175. RemoteAddr() net.Addr
  176. }
  177. m.pmut.RLock()
  178. m.rmut.RLock()
  179. var res = make(map[string]ConnectionInfo)
  180. for node, conn := range m.protoConn {
  181. ci := ConnectionInfo{
  182. Statistics: conn.Statistics(),
  183. ClientVersion: m.nodeVer[node],
  184. }
  185. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  186. ci.Address = nc.RemoteAddr().String()
  187. }
  188. res[node.String()] = ci
  189. }
  190. m.rmut.RUnlock()
  191. m.pmut.RUnlock()
  192. in, out := protocol.TotalInOut()
  193. res["total"] = ConnectionInfo{
  194. Statistics: protocol.Statistics{
  195. At: time.Now(),
  196. InBytesTotal: in,
  197. OutBytesTotal: out,
  198. },
  199. }
  200. return res
  201. }
  202. // Returns statistics about each node
  203. func (m *Model) NodeStatistics() map[string]stats.NodeStatistics {
  204. var res = make(map[string]stats.NodeStatistics)
  205. for _, node := range m.cfg.Nodes {
  206. res[node.NodeID.String()] = m.nodeStatRef(node.NodeID).GetStatistics()
  207. }
  208. return res
  209. }
  210. // Returns the completion status, in percent, for the given node and repo.
  211. func (m *Model) Completion(node protocol.NodeID, repo string) float64 {
  212. var tot int64
  213. m.rmut.RLock()
  214. rf, ok := m.repoFiles[repo]
  215. m.rmut.RUnlock()
  216. if !ok {
  217. return 0 // Repo doesn't exist, so we hardly have any of it
  218. }
  219. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  220. if !f.IsDeleted() {
  221. tot += f.Size()
  222. }
  223. return true
  224. })
  225. if tot == 0 {
  226. return 100 // Repo is empty, so we have all of it
  227. }
  228. var need int64
  229. rf.WithNeedTruncated(node, func(f protocol.FileIntf) bool {
  230. if !f.IsDeleted() {
  231. need += f.Size()
  232. }
  233. return true
  234. })
  235. res := 100 * (1 - float64(need)/float64(tot))
  236. if debug {
  237. l.Debugf("%v Completion(%s, %q): %f (%d / %d)", m, node, repo, res, need, tot)
  238. }
  239. return res
  240. }
  241. func sizeOf(fs []protocol.FileInfo) (files, deleted int, bytes int64) {
  242. for _, f := range fs {
  243. fs, de, by := sizeOfFile(f)
  244. files += fs
  245. deleted += de
  246. bytes += by
  247. }
  248. return
  249. }
  250. func sizeOfFile(f protocol.FileIntf) (files, deleted int, bytes int64) {
  251. if !f.IsDeleted() {
  252. files++
  253. } else {
  254. deleted++
  255. }
  256. bytes += f.Size()
  257. return
  258. }
  259. // GlobalSize returns the number of files, deleted files and total bytes for all
  260. // files in the global model.
  261. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  262. m.rmut.RLock()
  263. defer m.rmut.RUnlock()
  264. if rf, ok := m.repoFiles[repo]; ok {
  265. rf.WithGlobalTruncated(func(f protocol.FileIntf) bool {
  266. fs, de, by := sizeOfFile(f)
  267. files += fs
  268. deleted += de
  269. bytes += by
  270. return true
  271. })
  272. }
  273. return
  274. }
  275. // LocalSize returns the number of files, deleted files and total bytes for all
  276. // files in the local repository.
  277. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  278. m.rmut.RLock()
  279. defer m.rmut.RUnlock()
  280. if rf, ok := m.repoFiles[repo]; ok {
  281. rf.WithHaveTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  282. if f.IsInvalid() {
  283. return true
  284. }
  285. fs, de, by := sizeOfFile(f)
  286. files += fs
  287. deleted += de
  288. bytes += by
  289. return true
  290. })
  291. }
  292. return
  293. }
  294. // NeedSize returns the number and total size of currently needed files.
  295. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  296. m.rmut.RLock()
  297. defer m.rmut.RUnlock()
  298. if rf, ok := m.repoFiles[repo]; ok {
  299. rf.WithNeedTruncated(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  300. fs, de, by := sizeOfFile(f)
  301. files += fs + de
  302. bytes += by
  303. return true
  304. })
  305. }
  306. if debug {
  307. l.Debugf("%v NeedSize(%q): %d %d", m, repo, files, bytes)
  308. }
  309. return
  310. }
  311. // NeedFiles returns the list of currently needed files, stopping at maxFiles
  312. // files or maxBlocks blocks. Limits <= 0 are ignored.
  313. func (m *Model) NeedFilesRepoLimited(repo string, maxFiles, maxBlocks int) []protocol.FileInfo {
  314. m.rmut.RLock()
  315. defer m.rmut.RUnlock()
  316. nblocks := 0
  317. if rf, ok := m.repoFiles[repo]; ok {
  318. fs := make([]protocol.FileInfo, 0, maxFiles)
  319. rf.WithNeed(protocol.LocalNodeID, func(f protocol.FileIntf) bool {
  320. fi := f.(protocol.FileInfo)
  321. fs = append(fs, fi)
  322. nblocks += len(fi.Blocks)
  323. return (maxFiles <= 0 || len(fs) < maxFiles) && (maxBlocks <= 0 || nblocks < maxBlocks)
  324. })
  325. return fs
  326. }
  327. return nil
  328. }
  329. // Index is called when a new node is connected and we receive their full index.
  330. // Implements the protocol.Model interface.
  331. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  332. if debug {
  333. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  334. }
  335. if !m.repoSharedWith(repo, nodeID) {
  336. events.Default.Log(events.RepoRejected, map[string]string{
  337. "repo": repo,
  338. "node": nodeID.String(),
  339. })
  340. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  341. return
  342. }
  343. m.rmut.RLock()
  344. files, ok := m.repoFiles[repo]
  345. ignores, _ := m.repoIgnores[repo]
  346. m.rmut.RUnlock()
  347. if !ok {
  348. l.Fatalf("Index for nonexistant repo %q", repo)
  349. }
  350. for i := 0; i < len(fs); {
  351. lamport.Default.Tick(fs[i].Version)
  352. if ignores.Match(fs[i].Name) {
  353. fs[i] = fs[len(fs)-1]
  354. fs = fs[:len(fs)-1]
  355. } else {
  356. i++
  357. }
  358. }
  359. files.Replace(nodeID, fs)
  360. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  361. "node": nodeID.String(),
  362. "repo": repo,
  363. "items": len(fs),
  364. "version": files.LocalVersion(nodeID),
  365. })
  366. }
  367. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  368. // Implements the protocol.Model interface.
  369. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  370. if debug {
  371. l.Debugf("%v IDXUP(in): %s / %q: %d files", m, nodeID, repo, len(fs))
  372. }
  373. if !m.repoSharedWith(repo, nodeID) {
  374. l.Infof("Update for unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  375. return
  376. }
  377. m.rmut.RLock()
  378. files, ok := m.repoFiles[repo]
  379. ignores, _ := m.repoIgnores[repo]
  380. m.rmut.RUnlock()
  381. if !ok {
  382. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  383. }
  384. for i := 0; i < len(fs); {
  385. lamport.Default.Tick(fs[i].Version)
  386. if ignores.Match(fs[i].Name) {
  387. fs[i] = fs[len(fs)-1]
  388. fs = fs[:len(fs)-1]
  389. } else {
  390. i++
  391. }
  392. }
  393. files.Update(nodeID, fs)
  394. events.Default.Log(events.RemoteIndexUpdated, map[string]interface{}{
  395. "node": nodeID.String(),
  396. "repo": repo,
  397. "items": len(fs),
  398. "version": files.LocalVersion(nodeID),
  399. })
  400. }
  401. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  402. m.rmut.RLock()
  403. defer m.rmut.RUnlock()
  404. for _, nrepo := range m.nodeRepos[nodeID] {
  405. if nrepo == repo {
  406. return true
  407. }
  408. }
  409. return false
  410. }
  411. func (m *Model) ClusterConfig(nodeID protocol.NodeID, cm protocol.ClusterConfigMessage) {
  412. m.pmut.Lock()
  413. if cm.ClientName == "syncthing" {
  414. m.nodeVer[nodeID] = cm.ClientVersion
  415. } else {
  416. m.nodeVer[nodeID] = cm.ClientName + " " + cm.ClientVersion
  417. }
  418. m.pmut.Unlock()
  419. l.Infof(`Node %s client is "%s %s"`, nodeID, cm.ClientName, cm.ClientVersion)
  420. if name := cm.GetOption("name"); name != "" {
  421. l.Infof("Node %s name is %q", nodeID, name)
  422. node := m.cfg.GetNodeConfiguration(nodeID)
  423. if node != nil && node.Name == "" {
  424. node.Name = name
  425. m.cfg.Save()
  426. }
  427. }
  428. if m.cfg.GetNodeConfiguration(nodeID).Introducer {
  429. // This node is an introducer. Go through the announced lists of repos
  430. // and nodes and add what we are missing.
  431. var changed bool
  432. for _, repo := range cm.Repositories {
  433. // If we don't have this repository yet, skip it. Ideally, we'd
  434. // offer up something in the GUI to create the repo, but for the
  435. // moment we only handle repos that we already have.
  436. if _, ok := m.repoNodes[repo.ID]; !ok {
  437. continue
  438. }
  439. nextNode:
  440. for _, node := range repo.Nodes {
  441. var id protocol.NodeID
  442. copy(id[:], node.ID)
  443. if m.cfg.GetNodeConfiguration(id) == nil {
  444. // The node is currently unknown. Add it to the config.
  445. l.Infof("Adding node %v to config (vouched for by introducer %v)", id, nodeID)
  446. newNodeCfg := config.NodeConfiguration{
  447. NodeID: id,
  448. }
  449. // The introducers' introducers are also our introducers.
  450. if node.Flags&protocol.FlagIntroducer != 0 {
  451. l.Infof("Node %v is now also an introducer", id)
  452. newNodeCfg.Introducer = true
  453. }
  454. m.cfg.Nodes = append(m.cfg.Nodes, newNodeCfg)
  455. changed = true
  456. }
  457. for _, er := range m.nodeRepos[id] {
  458. if er == repo.ID {
  459. // We already share the repo with this node, so
  460. // nothing to do.
  461. continue nextNode
  462. }
  463. }
  464. // We don't yet share this repo with this node. Add the node
  465. // to sharing list of the repo.
  466. l.Infof("Adding node %v to share %q (vouched for by introducer %v)", id, repo.ID, nodeID)
  467. m.nodeRepos[id] = append(m.nodeRepos[id], repo.ID)
  468. m.repoNodes[repo.ID] = append(m.repoNodes[repo.ID], id)
  469. repoCfg := m.cfg.GetRepoConfiguration(repo.ID)
  470. repoCfg.Nodes = append(repoCfg.Nodes, config.RepositoryNodeConfiguration{
  471. NodeID: id,
  472. })
  473. changed = true
  474. }
  475. }
  476. if changed {
  477. m.cfg.Save()
  478. }
  479. }
  480. }
  481. // Close removes the peer from the model and closes the underlying connection if possible.
  482. // Implements the protocol.Model interface.
  483. func (m *Model) Close(node protocol.NodeID, err error) {
  484. l.Infof("Connection to %s closed: %v", node, err)
  485. events.Default.Log(events.NodeDisconnected, map[string]string{
  486. "id": node.String(),
  487. "error": err.Error(),
  488. })
  489. m.pmut.Lock()
  490. m.rmut.RLock()
  491. for _, repo := range m.nodeRepos[node] {
  492. m.repoFiles[repo].Replace(node, nil)
  493. }
  494. m.rmut.RUnlock()
  495. conn, ok := m.rawConn[node]
  496. if ok {
  497. if conn, ok := conn.(*tls.Conn); ok {
  498. // If the underlying connection is a *tls.Conn, Close() does more
  499. // than it says on the tin. Specifically, it sends a TLS alert
  500. // message, which might block forever if the connection is dead
  501. // and we don't have a deadline site.
  502. conn.SetWriteDeadline(time.Now().Add(250 * time.Millisecond))
  503. }
  504. conn.Close()
  505. }
  506. delete(m.protoConn, node)
  507. delete(m.rawConn, node)
  508. delete(m.nodeVer, node)
  509. m.pmut.Unlock()
  510. }
  511. // Request returns the specified data segment by reading it from local disk.
  512. // Implements the protocol.Model interface.
  513. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  514. // Verify that the requested file exists in the local model.
  515. m.rmut.RLock()
  516. r, ok := m.repoFiles[repo]
  517. m.rmut.RUnlock()
  518. if !ok {
  519. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  520. return nil, ErrNoSuchFile
  521. }
  522. lf := r.Get(protocol.LocalNodeID, name)
  523. if protocol.IsInvalid(lf.Flags) || protocol.IsDeleted(lf.Flags) {
  524. if debug {
  525. l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", m, nodeID, repo, name, offset, size, lf)
  526. }
  527. return nil, ErrInvalid
  528. }
  529. if offset > lf.Size() {
  530. if debug {
  531. l.Debugf("%v REQ(in; nonexistent): %s: %q o=%d s=%d", m, nodeID, name, offset, size)
  532. }
  533. return nil, ErrNoSuchFile
  534. }
  535. if debug && nodeID != protocol.LocalNodeID {
  536. l.Debugf("%v REQ(in): %s: %q / %q o=%d s=%d", m, nodeID, repo, name, offset, size)
  537. }
  538. m.rmut.RLock()
  539. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  540. m.rmut.RUnlock()
  541. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  542. if err != nil {
  543. return nil, err
  544. }
  545. defer fd.Close()
  546. buf := make([]byte, size)
  547. _, err = fd.ReadAt(buf, offset)
  548. if err != nil {
  549. return nil, err
  550. }
  551. return buf, nil
  552. }
  553. // ReplaceLocal replaces the local repository index with the given list of files.
  554. func (m *Model) ReplaceLocal(repo string, fs []protocol.FileInfo) {
  555. m.rmut.RLock()
  556. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  557. m.rmut.RUnlock()
  558. }
  559. func (m *Model) CurrentRepoFile(repo string, file string) protocol.FileInfo {
  560. m.rmut.RLock()
  561. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  562. m.rmut.RUnlock()
  563. return f
  564. }
  565. func (m *Model) CurrentGlobalFile(repo string, file string) protocol.FileInfo {
  566. m.rmut.RLock()
  567. f := m.repoFiles[repo].GetGlobal(file)
  568. m.rmut.RUnlock()
  569. return f
  570. }
  571. type cFiler struct {
  572. m *Model
  573. r string
  574. }
  575. // Implements scanner.CurrentFiler
  576. func (cf cFiler) CurrentFile(file string) protocol.FileInfo {
  577. return cf.m.CurrentRepoFile(cf.r, file)
  578. }
  579. // ConnectedTo returns true if we are connected to the named node.
  580. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  581. m.pmut.RLock()
  582. _, ok := m.protoConn[nodeID]
  583. m.pmut.RUnlock()
  584. if ok {
  585. m.nodeWasSeen(nodeID)
  586. }
  587. return ok
  588. }
  589. func (m *Model) GetIgnores(repo string) ([]string, error) {
  590. var lines []string
  591. cfg, ok := m.repoCfgs[repo]
  592. if !ok {
  593. return lines, fmt.Errorf("Repo %s does not exist", repo)
  594. }
  595. m.rmut.Lock()
  596. defer m.rmut.Unlock()
  597. fd, err := os.Open(filepath.Join(cfg.Directory, ".stignore"))
  598. if err != nil {
  599. if os.IsNotExist(err) {
  600. return lines, nil
  601. }
  602. l.Warnln("Loading .stignore:", err)
  603. return lines, err
  604. }
  605. defer fd.Close()
  606. scanner := bufio.NewScanner(fd)
  607. for scanner.Scan() {
  608. lines = append(lines, strings.TrimSpace(scanner.Text()))
  609. }
  610. return lines, nil
  611. }
  612. func (m *Model) SetIgnores(repo string, content []string) error {
  613. cfg, ok := m.repoCfgs[repo]
  614. if !ok {
  615. return fmt.Errorf("Repo %s does not exist", repo)
  616. }
  617. fd, err := ioutil.TempFile(cfg.Directory, ".syncthing.stignore-"+repo)
  618. if err != nil {
  619. l.Warnln("Saving .stignore:", err)
  620. return err
  621. }
  622. defer os.Remove(fd.Name())
  623. for _, line := range content {
  624. _, err = fmt.Fprintln(fd, line)
  625. if err != nil {
  626. l.Warnln("Saving .stignore:", err)
  627. return err
  628. }
  629. }
  630. err = fd.Close()
  631. if err != nil {
  632. l.Warnln("Saving .stignore:", err)
  633. return err
  634. }
  635. file := filepath.Join(cfg.Directory, ".stignore")
  636. err = osutil.Rename(fd.Name(), file)
  637. if err != nil {
  638. l.Warnln("Saving .stignore:", err)
  639. return err
  640. }
  641. return m.ScanRepo(repo)
  642. }
  643. // AddConnection adds a new peer connection to the model. An initial index will
  644. // be sent to the connected peer, thereafter index updates whenever the local
  645. // repository changes.
  646. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  647. nodeID := protoConn.ID()
  648. m.pmut.Lock()
  649. if _, ok := m.protoConn[nodeID]; ok {
  650. panic("add existing node")
  651. }
  652. m.protoConn[nodeID] = protoConn
  653. if _, ok := m.rawConn[nodeID]; ok {
  654. panic("add existing node")
  655. }
  656. m.rawConn[nodeID] = rawConn
  657. cm := m.clusterConfig(nodeID)
  658. protoConn.ClusterConfig(cm)
  659. m.rmut.RLock()
  660. for _, repo := range m.nodeRepos[nodeID] {
  661. fs := m.repoFiles[repo]
  662. go sendIndexes(protoConn, repo, fs, m.repoIgnores[repo])
  663. }
  664. m.rmut.RUnlock()
  665. m.pmut.Unlock()
  666. m.nodeWasSeen(nodeID)
  667. }
  668. func (m *Model) nodeStatRef(nodeID protocol.NodeID) *stats.NodeStatisticsReference {
  669. m.rmut.Lock()
  670. defer m.rmut.Unlock()
  671. if sr, ok := m.nodeStatRefs[nodeID]; ok {
  672. return sr
  673. } else {
  674. sr = stats.NewNodeStatisticsReference(m.db, nodeID)
  675. m.nodeStatRefs[nodeID] = sr
  676. return sr
  677. }
  678. }
  679. func (m *Model) nodeWasSeen(nodeID protocol.NodeID) {
  680. m.nodeStatRef(nodeID).WasSeen()
  681. }
  682. func sendIndexes(conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) {
  683. nodeID := conn.ID()
  684. name := conn.Name()
  685. var err error
  686. if debug {
  687. l.Debugf("sendIndexes for %s-%s/%q starting", nodeID, name, repo)
  688. }
  689. minLocalVer, err := sendIndexTo(true, 0, conn, repo, fs, ignores)
  690. for err == nil {
  691. time.Sleep(5 * time.Second)
  692. if fs.LocalVersion(protocol.LocalNodeID) <= minLocalVer {
  693. continue
  694. }
  695. minLocalVer, err = sendIndexTo(false, minLocalVer, conn, repo, fs, ignores)
  696. }
  697. if debug {
  698. l.Debugf("sendIndexes for %s-%s/%q exiting: %v", nodeID, name, repo, err)
  699. }
  700. }
  701. func sendIndexTo(initial bool, minLocalVer uint64, conn protocol.Connection, repo string, fs *files.Set, ignores ignore.Patterns) (uint64, error) {
  702. nodeID := conn.ID()
  703. name := conn.Name()
  704. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  705. currentBatchSize := 0
  706. maxLocalVer := uint64(0)
  707. var err error
  708. fs.WithHave(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  709. f := fi.(protocol.FileInfo)
  710. if f.LocalVersion <= minLocalVer {
  711. return true
  712. }
  713. if f.LocalVersion > maxLocalVer {
  714. maxLocalVer = f.LocalVersion
  715. }
  716. if ignores.Match(f.Name) {
  717. return true
  718. }
  719. if len(batch) == indexBatchSize || currentBatchSize > indexTargetSize {
  720. if initial {
  721. if err = conn.Index(repo, batch); err != nil {
  722. return false
  723. }
  724. if debug {
  725. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (initial index)", nodeID, name, repo, len(batch), currentBatchSize)
  726. }
  727. initial = false
  728. } else {
  729. if err = conn.IndexUpdate(repo, batch); err != nil {
  730. return false
  731. }
  732. if debug {
  733. l.Debugf("sendIndexes for %s-%s/%q: %d files (<%d bytes) (batched update)", nodeID, name, repo, len(batch), currentBatchSize)
  734. }
  735. }
  736. batch = make([]protocol.FileInfo, 0, indexBatchSize)
  737. currentBatchSize = 0
  738. }
  739. batch = append(batch, f)
  740. currentBatchSize += indexPerFileSize + len(f.Blocks)*IndexPerBlockSize
  741. return true
  742. })
  743. if initial && err == nil {
  744. err = conn.Index(repo, batch)
  745. if debug && err == nil {
  746. l.Debugf("sendIndexes for %s-%s/%q: %d files (small initial index)", nodeID, name, repo, len(batch))
  747. }
  748. } else if len(batch) > 0 && err == nil {
  749. err = conn.IndexUpdate(repo, batch)
  750. if debug && err == nil {
  751. l.Debugf("sendIndexes for %s-%s/%q: %d files (last batch)", nodeID, name, repo, len(batch))
  752. }
  753. }
  754. return maxLocalVer, err
  755. }
  756. func (m *Model) updateLocal(repo string, f protocol.FileInfo) {
  757. f.LocalVersion = 0
  758. m.rmut.RLock()
  759. m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f})
  760. m.rmut.RUnlock()
  761. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  762. "repo": repo,
  763. "name": f.Name,
  764. "modified": time.Unix(f.Modified, 0),
  765. "flags": fmt.Sprintf("0%o", f.Flags),
  766. "size": f.Size(),
  767. })
  768. }
  769. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  770. m.pmut.RLock()
  771. nc, ok := m.protoConn[nodeID]
  772. m.pmut.RUnlock()
  773. if !ok {
  774. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  775. }
  776. if debug {
  777. l.Debugf("%v REQ(out): %s: %q / %q o=%d s=%d h=%x", m, nodeID, repo, name, offset, size, hash)
  778. }
  779. return nc.Request(repo, name, offset, size)
  780. }
  781. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  782. if m.started {
  783. panic("cannot add repo to started model")
  784. }
  785. if len(cfg.ID) == 0 {
  786. panic("cannot add empty repo id")
  787. }
  788. m.rmut.Lock()
  789. m.repoCfgs[cfg.ID] = cfg
  790. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  791. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  792. for i, node := range cfg.Nodes {
  793. m.repoNodes[cfg.ID][i] = node.NodeID
  794. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  795. }
  796. m.addedRepo = true
  797. m.rmut.Unlock()
  798. }
  799. func (m *Model) ScanRepos() {
  800. m.rmut.RLock()
  801. var repos = make([]string, 0, len(m.repoCfgs))
  802. for repo := range m.repoCfgs {
  803. repos = append(repos, repo)
  804. }
  805. m.rmut.RUnlock()
  806. var wg sync.WaitGroup
  807. wg.Add(len(repos))
  808. for _, repo := range repos {
  809. repo := repo
  810. go func() {
  811. err := m.ScanRepo(repo)
  812. if err != nil {
  813. invalidateRepo(m.cfg, repo, err)
  814. }
  815. wg.Done()
  816. }()
  817. }
  818. wg.Wait()
  819. }
  820. func (m *Model) CleanRepos() {
  821. m.rmut.RLock()
  822. var dirs = make([]string, 0, len(m.repoCfgs))
  823. for _, cfg := range m.repoCfgs {
  824. dirs = append(dirs, cfg.Directory)
  825. }
  826. m.rmut.RUnlock()
  827. var wg sync.WaitGroup
  828. wg.Add(len(dirs))
  829. for _, dir := range dirs {
  830. w := &scanner.Walker{
  831. Dir: dir,
  832. TempNamer: defTempNamer,
  833. }
  834. go func() {
  835. w.CleanTempFiles()
  836. wg.Done()
  837. }()
  838. }
  839. wg.Wait()
  840. }
  841. func (m *Model) ScanRepo(repo string) error {
  842. return m.ScanRepoSub(repo, "")
  843. }
  844. func (m *Model) ScanRepoSub(repo, sub string) error {
  845. if p := filepath.Clean(filepath.Join(repo, sub)); !strings.HasPrefix(p, repo) {
  846. return errors.New("invalid subpath")
  847. }
  848. m.rmut.RLock()
  849. fs, ok := m.repoFiles[repo]
  850. dir := m.repoCfgs[repo].Directory
  851. ignores, _ := ignore.Load(filepath.Join(dir, ".stignore"))
  852. m.repoIgnores[repo] = ignores
  853. w := &scanner.Walker{
  854. Dir: dir,
  855. Sub: sub,
  856. Ignores: ignores,
  857. BlockSize: scanner.StandardBlockSize,
  858. TempNamer: defTempNamer,
  859. CurrentFiler: cFiler{m, repo},
  860. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  861. }
  862. m.rmut.RUnlock()
  863. if !ok {
  864. return errors.New("no such repo")
  865. }
  866. m.setState(repo, RepoScanning)
  867. fchan, err := w.Walk()
  868. if err != nil {
  869. return err
  870. }
  871. batchSize := 100
  872. batch := make([]protocol.FileInfo, 0, 00)
  873. for f := range fchan {
  874. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  875. "repo": repo,
  876. "name": f.Name,
  877. "modified": time.Unix(f.Modified, 0),
  878. "flags": fmt.Sprintf("0%o", f.Flags),
  879. "size": f.Size(),
  880. })
  881. if len(batch) == batchSize {
  882. fs.Update(protocol.LocalNodeID, batch)
  883. batch = batch[:0]
  884. }
  885. batch = append(batch, f)
  886. }
  887. if len(batch) > 0 {
  888. fs.Update(protocol.LocalNodeID, batch)
  889. }
  890. batch = batch[:0]
  891. // TODO: We should limit the Have scanning to start at sub
  892. seenPrefix := false
  893. fs.WithHaveTruncated(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  894. f := fi.(protocol.FileInfoTruncated)
  895. if !strings.HasPrefix(f.Name, sub) {
  896. // Return true so that we keep iterating, until we get to the part
  897. // of the tree we are interested in. Then return false so we stop
  898. // iterating when we've passed the end of the subtree.
  899. return !seenPrefix
  900. }
  901. seenPrefix = true
  902. if !protocol.IsDeleted(f.Flags) {
  903. if f.IsInvalid() {
  904. return true
  905. }
  906. if len(batch) == batchSize {
  907. fs.Update(protocol.LocalNodeID, batch)
  908. batch = batch[:0]
  909. }
  910. if ignores.Match(f.Name) {
  911. // File has been ignored. Set invalid bit.
  912. nf := protocol.FileInfo{
  913. Name: f.Name,
  914. Flags: f.Flags | protocol.FlagInvalid,
  915. Modified: f.Modified,
  916. Version: f.Version, // The file is still the same, so don't bump version
  917. }
  918. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  919. "repo": repo,
  920. "name": f.Name,
  921. "modified": time.Unix(f.Modified, 0),
  922. "flags": fmt.Sprintf("0%o", f.Flags),
  923. "size": f.Size(),
  924. })
  925. batch = append(batch, nf)
  926. } else if _, err := os.Stat(filepath.Join(dir, f.Name)); err != nil && os.IsNotExist(err) {
  927. // File has been deleted
  928. nf := protocol.FileInfo{
  929. Name: f.Name,
  930. Flags: f.Flags | protocol.FlagDeleted,
  931. Modified: f.Modified,
  932. Version: lamport.Default.Tick(f.Version),
  933. }
  934. events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{
  935. "repo": repo,
  936. "name": f.Name,
  937. "modified": time.Unix(f.Modified, 0),
  938. "flags": fmt.Sprintf("0%o", f.Flags),
  939. "size": f.Size(),
  940. })
  941. batch = append(batch, nf)
  942. }
  943. }
  944. return true
  945. })
  946. if len(batch) > 0 {
  947. fs.Update(protocol.LocalNodeID, batch)
  948. }
  949. m.setState(repo, RepoIdle)
  950. return nil
  951. }
  952. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  953. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  954. cm := protocol.ClusterConfigMessage{
  955. ClientName: m.clientName,
  956. ClientVersion: m.clientVersion,
  957. Options: []protocol.Option{
  958. {
  959. Key: "name",
  960. Value: m.nodeName,
  961. },
  962. },
  963. }
  964. m.rmut.RLock()
  965. for _, repo := range m.nodeRepos[node] {
  966. cr := protocol.Repository{
  967. ID: repo,
  968. }
  969. for _, node := range m.repoNodes[repo] {
  970. // NodeID is a value type, but with an underlying array. Copy it
  971. // so we don't grab aliases to the same array later on in node[:]
  972. node := node
  973. // TODO: Set read only bit when relevant
  974. cn := protocol.Node{
  975. ID: node[:],
  976. Flags: protocol.FlagShareTrusted,
  977. }
  978. if nodeCfg := m.cfg.GetNodeConfiguration(node); nodeCfg.Introducer {
  979. cn.Flags |= protocol.FlagIntroducer
  980. }
  981. cr.Nodes = append(cr.Nodes, cn)
  982. }
  983. cm.Repositories = append(cm.Repositories, cr)
  984. }
  985. m.rmut.RUnlock()
  986. return cm
  987. }
  988. func (m *Model) setState(repo string, state repoState) {
  989. m.smut.Lock()
  990. oldState := m.repoState[repo]
  991. changed, ok := m.repoStateChanged[repo]
  992. if state != oldState {
  993. m.repoState[repo] = state
  994. m.repoStateChanged[repo] = time.Now()
  995. eventData := map[string]interface{}{
  996. "repo": repo,
  997. "to": state.String(),
  998. }
  999. if ok {
  1000. eventData["duration"] = time.Since(changed).Seconds()
  1001. eventData["from"] = oldState.String()
  1002. }
  1003. events.Default.Log(events.StateChanged, eventData)
  1004. }
  1005. m.smut.Unlock()
  1006. }
  1007. func (m *Model) State(repo string) (string, time.Time) {
  1008. m.smut.RLock()
  1009. state := m.repoState[repo]
  1010. changed := m.repoStateChanged[repo]
  1011. m.smut.RUnlock()
  1012. return state.String(), changed
  1013. }
  1014. func (m *Model) Override(repo string) {
  1015. m.rmut.RLock()
  1016. fs := m.repoFiles[repo]
  1017. m.rmut.RUnlock()
  1018. m.setState(repo, RepoScanning)
  1019. batch := make([]protocol.FileInfo, 0, indexBatchSize)
  1020. fs.WithNeed(protocol.LocalNodeID, func(fi protocol.FileIntf) bool {
  1021. need := fi.(protocol.FileInfo)
  1022. if len(batch) == indexBatchSize {
  1023. fs.Update(protocol.LocalNodeID, batch)
  1024. batch = batch[:0]
  1025. }
  1026. have := fs.Get(protocol.LocalNodeID, need.Name)
  1027. if have.Name != need.Name {
  1028. // We are missing the file
  1029. need.Flags |= protocol.FlagDeleted
  1030. need.Blocks = nil
  1031. } else {
  1032. // We have the file, replace with our version
  1033. need = have
  1034. }
  1035. need.Version = lamport.Default.Tick(need.Version)
  1036. need.LocalVersion = 0
  1037. batch = append(batch, need)
  1038. return true
  1039. })
  1040. if len(batch) > 0 {
  1041. fs.Update(protocol.LocalNodeID, batch)
  1042. }
  1043. m.setState(repo, RepoIdle)
  1044. }
  1045. // CurrentLocalVersion returns the change version for the given repository.
  1046. // This is guaranteed to increment if the contents of the local repository has
  1047. // changed.
  1048. func (m *Model) CurrentLocalVersion(repo string) uint64 {
  1049. m.rmut.Lock()
  1050. defer m.rmut.Unlock()
  1051. fs, ok := m.repoFiles[repo]
  1052. if !ok {
  1053. panic("bug: LocalVersion called for nonexistent repo " + repo)
  1054. }
  1055. return fs.LocalVersion(protocol.LocalNodeID)
  1056. }
  1057. // RemoteLocalVersion returns the change version for the given repository, as
  1058. // sent by remote peers. This is guaranteed to increment if the contents of
  1059. // the remote or global repository has changed.
  1060. func (m *Model) RemoteLocalVersion(repo string) uint64 {
  1061. m.rmut.Lock()
  1062. defer m.rmut.Unlock()
  1063. fs, ok := m.repoFiles[repo]
  1064. if !ok {
  1065. panic("bug: LocalVersion called for nonexistent repo " + repo)
  1066. }
  1067. var ver uint64
  1068. for _, n := range m.repoNodes[repo] {
  1069. ver += fs.LocalVersion(n)
  1070. }
  1071. return ver
  1072. }
  1073. func (m *Model) availability(repo string, file string) []protocol.NodeID {
  1074. m.rmut.Lock()
  1075. defer m.rmut.Unlock()
  1076. fs, ok := m.repoFiles[repo]
  1077. if !ok {
  1078. return nil
  1079. }
  1080. return fs.Availability(file)
  1081. }
  1082. func (m *Model) String() string {
  1083. return fmt.Sprintf("model@%p", m)
  1084. }