model.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904
  1. // Copyright (C) 2014 Jakob Borg and other contributors. All rights reserved.
  2. // Use of this source code is governed by an MIT-style license that can be
  3. // found in the LICENSE file.
  4. package model
  5. import (
  6. "compress/gzip"
  7. "crypto/sha1"
  8. "errors"
  9. "fmt"
  10. "io"
  11. "net"
  12. "os"
  13. "path/filepath"
  14. "strconv"
  15. "sync"
  16. "time"
  17. "github.com/calmh/syncthing/config"
  18. "github.com/calmh/syncthing/files"
  19. "github.com/calmh/syncthing/lamport"
  20. "github.com/calmh/syncthing/osutil"
  21. "github.com/calmh/syncthing/protocol"
  22. "github.com/calmh/syncthing/scanner"
  23. "github.com/syndtr/goleveldb/leveldb"
  24. )
  25. type repoState int
  26. const (
  27. RepoIdle repoState = iota
  28. RepoScanning
  29. RepoSyncing
  30. RepoCleaning
  31. )
  32. // Somewhat arbitrary amount of bytes that we choose to let represent the size
  33. // of an unsynchronized directory entry or a deleted file. We need it to be
  34. // larger than zero so that it's visible that there is some amount of bytes to
  35. // transfer to bring the systems into synchronization.
  36. const zeroEntrySize = 128
  37. type Model struct {
  38. indexDir string
  39. cfg *config.Configuration
  40. db *leveldb.DB
  41. clientName string
  42. clientVersion string
  43. repoCfgs map[string]config.RepositoryConfiguration // repo -> cfg
  44. repoFiles map[string]*files.Set // repo -> files
  45. repoNodes map[string][]protocol.NodeID // repo -> nodeIDs
  46. nodeRepos map[protocol.NodeID][]string // nodeID -> repos
  47. suppressor map[string]*suppressor // repo -> suppressor
  48. rmut sync.RWMutex // protects the above
  49. repoState map[string]repoState // repo -> state
  50. smut sync.RWMutex
  51. protoConn map[protocol.NodeID]protocol.Connection
  52. rawConn map[protocol.NodeID]io.Closer
  53. nodeVer map[protocol.NodeID]string
  54. pmut sync.RWMutex // protects protoConn and rawConn
  55. sup suppressor
  56. addedRepo bool
  57. started bool
  58. }
  59. var (
  60. ErrNoSuchFile = errors.New("no such file")
  61. ErrInvalid = errors.New("file is invalid")
  62. )
  63. // NewModel creates and starts a new model. The model starts in read-only mode,
  64. // where it sends index information to connected peers and responds to requests
  65. // for file data without altering the local repository in any way.
  66. func NewModel(indexDir string, cfg *config.Configuration, clientName, clientVersion string, db *leveldb.DB) *Model {
  67. m := &Model{
  68. indexDir: indexDir,
  69. cfg: cfg,
  70. db: db,
  71. clientName: clientName,
  72. clientVersion: clientVersion,
  73. repoCfgs: make(map[string]config.RepositoryConfiguration),
  74. repoFiles: make(map[string]*files.Set),
  75. repoNodes: make(map[string][]protocol.NodeID),
  76. nodeRepos: make(map[protocol.NodeID][]string),
  77. repoState: make(map[string]repoState),
  78. suppressor: make(map[string]*suppressor),
  79. protoConn: make(map[protocol.NodeID]protocol.Connection),
  80. rawConn: make(map[protocol.NodeID]io.Closer),
  81. nodeVer: make(map[protocol.NodeID]string),
  82. sup: suppressor{threshold: int64(cfg.Options.MaxChangeKbps)},
  83. }
  84. var timeout = 20 * 60 // seconds
  85. if t := os.Getenv("STDEADLOCKTIMEOUT"); len(t) > 0 {
  86. it, err := strconv.Atoi(t)
  87. if err == nil {
  88. timeout = it
  89. }
  90. }
  91. deadlockDetect(&m.rmut, time.Duration(timeout)*time.Second)
  92. deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
  93. deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
  94. go m.broadcastIndexLoop()
  95. return m
  96. }
  97. // StartRW starts read/write processing on the current model. When in
  98. // read/write mode the model will attempt to keep in sync with the cluster by
  99. // pulling needed files from peer nodes.
  100. func (m *Model) StartRepoRW(repo string, threads int) {
  101. m.rmut.RLock()
  102. defer m.rmut.RUnlock()
  103. if cfg, ok := m.repoCfgs[repo]; !ok {
  104. panic("cannot start without repo")
  105. } else {
  106. newPuller(cfg, m, threads, m.cfg)
  107. }
  108. }
  109. // StartRO starts read only processing on the current model. When in
  110. // read only mode the model will announce files to the cluster but not
  111. // pull in any external changes.
  112. func (m *Model) StartRepoRO(repo string) {
  113. m.StartRepoRW(repo, 0) // zero threads => read only
  114. }
  115. type ConnectionInfo struct {
  116. protocol.Statistics
  117. Address string
  118. ClientVersion string
  119. Completion int
  120. }
  121. // ConnectionStats returns a map with connection statistics for each connected node.
  122. func (m *Model) ConnectionStats() map[string]ConnectionInfo {
  123. type remoteAddrer interface {
  124. RemoteAddr() net.Addr
  125. }
  126. m.pmut.RLock()
  127. m.rmut.RLock()
  128. var res = make(map[string]ConnectionInfo)
  129. for node, conn := range m.protoConn {
  130. ci := ConnectionInfo{
  131. Statistics: conn.Statistics(),
  132. ClientVersion: m.nodeVer[node],
  133. }
  134. if nc, ok := m.rawConn[node].(remoteAddrer); ok {
  135. ci.Address = nc.RemoteAddr().String()
  136. }
  137. var tot int64
  138. var have int64
  139. for _, repo := range m.nodeRepos[node] {
  140. m.repoFiles[repo].WithGlobal(func(f scanner.File) bool {
  141. if !protocol.IsDeleted(f.Flags) {
  142. size := f.Size
  143. if protocol.IsDirectory(f.Flags) {
  144. size = zeroEntrySize
  145. }
  146. tot += size
  147. have += size
  148. }
  149. return true
  150. })
  151. m.repoFiles[repo].WithNeed(node, func(f scanner.File) bool {
  152. if !protocol.IsDeleted(f.Flags) {
  153. size := f.Size
  154. if protocol.IsDirectory(f.Flags) {
  155. size = zeroEntrySize
  156. }
  157. have -= size
  158. }
  159. return true
  160. })
  161. }
  162. ci.Completion = 100
  163. if tot != 0 {
  164. ci.Completion = int(100 * have / tot)
  165. }
  166. res[node.String()] = ci
  167. }
  168. m.rmut.RUnlock()
  169. m.pmut.RUnlock()
  170. in, out := protocol.TotalInOut()
  171. res["total"] = ConnectionInfo{
  172. Statistics: protocol.Statistics{
  173. At: time.Now(),
  174. InBytesTotal: in,
  175. OutBytesTotal: out,
  176. },
  177. }
  178. return res
  179. }
  180. func sizeOf(fs []scanner.File) (files, deleted int, bytes int64) {
  181. for _, f := range fs {
  182. fs, de, by := sizeOfFile(f)
  183. files += fs
  184. deleted += de
  185. bytes += by
  186. }
  187. return
  188. }
  189. func sizeOfFile(f scanner.File) (files, deleted int, bytes int64) {
  190. if !protocol.IsDeleted(f.Flags) {
  191. files++
  192. if !protocol.IsDirectory(f.Flags) {
  193. bytes += f.Size
  194. } else {
  195. bytes += zeroEntrySize
  196. }
  197. } else {
  198. deleted++
  199. bytes += zeroEntrySize
  200. }
  201. return
  202. }
  203. // GlobalSize returns the number of files, deleted files and total bytes for all
  204. // files in the global model.
  205. func (m *Model) GlobalSize(repo string) (files, deleted int, bytes int64) {
  206. m.rmut.RLock()
  207. defer m.rmut.RUnlock()
  208. if rf, ok := m.repoFiles[repo]; ok {
  209. rf.WithGlobal(func(f scanner.File) bool {
  210. fs, de, by := sizeOfFile(f)
  211. files += fs
  212. deleted += de
  213. bytes += by
  214. return true
  215. })
  216. }
  217. return
  218. }
  219. // LocalSize returns the number of files, deleted files and total bytes for all
  220. // files in the local repository.
  221. func (m *Model) LocalSize(repo string) (files, deleted int, bytes int64) {
  222. m.rmut.RLock()
  223. defer m.rmut.RUnlock()
  224. if rf, ok := m.repoFiles[repo]; ok {
  225. rf.WithHave(protocol.LocalNodeID, func(f scanner.File) bool {
  226. fs, de, by := sizeOfFile(f)
  227. files += fs
  228. deleted += de
  229. bytes += by
  230. return true
  231. })
  232. }
  233. return 0, 0, 0
  234. }
  235. // NeedSize returns the number and total size of currently needed files.
  236. func (m *Model) NeedSize(repo string) (files int, bytes int64) {
  237. f, d, b := sizeOf(m.NeedFilesRepo(repo))
  238. return f + d, b
  239. }
  240. // NeedFiles returns the list of currently needed files
  241. func (m *Model) NeedFilesRepo(repo string) []scanner.File {
  242. m.rmut.RLock()
  243. defer m.rmut.RUnlock()
  244. if rf, ok := m.repoFiles[repo]; ok {
  245. var fs []scanner.File
  246. rf.WithNeed(protocol.LocalNodeID, func(f scanner.File) bool {
  247. fs = append(fs, f)
  248. return true
  249. })
  250. if r := m.repoCfgs[repo].FileRanker(); r != nil {
  251. files.SortBy(r).Sort(fs)
  252. }
  253. return fs
  254. }
  255. return nil
  256. }
  257. // Index is called when a new node is connected and we receive their full index.
  258. // Implements the protocol.Model interface.
  259. func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  260. if debug {
  261. l.Debugf("IDX(in): %s %q: %d files", nodeID, repo, len(fs))
  262. }
  263. if !m.repoSharedWith(repo, nodeID) {
  264. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  265. return
  266. }
  267. var files = make([]scanner.File, len(fs))
  268. for i := range fs {
  269. f := fs[i]
  270. lamport.Default.Tick(f.Version)
  271. if debug {
  272. var flagComment string
  273. if protocol.IsDeleted(f.Flags) {
  274. flagComment = " (deleted)"
  275. }
  276. l.Debugf("IDX(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  277. }
  278. files[i] = fileFromFileInfo(f)
  279. }
  280. m.rmut.RLock()
  281. if r, ok := m.repoFiles[repo]; ok {
  282. r.Replace(nodeID, files)
  283. } else {
  284. l.Fatalf("Index for nonexistant repo %q", repo)
  285. }
  286. m.rmut.RUnlock()
  287. }
  288. // IndexUpdate is called for incremental updates to connected nodes' indexes.
  289. // Implements the protocol.Model interface.
  290. func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.FileInfo) {
  291. if debug {
  292. l.Debugf("IDXUP(in): %s / %q: %d files", nodeID, repo, len(fs))
  293. }
  294. if !m.repoSharedWith(repo, nodeID) {
  295. l.Warnf("Unexpected repository ID %q sent from node %q; ensure that the repository exists and that this node is selected under \"Share With\" in the repository configuration.", repo, nodeID)
  296. return
  297. }
  298. var files = make([]scanner.File, len(fs))
  299. for i := range fs {
  300. f := fs[i]
  301. lamport.Default.Tick(f.Version)
  302. if debug {
  303. var flagComment string
  304. if protocol.IsDeleted(f.Flags) {
  305. flagComment = " (deleted)"
  306. }
  307. l.Debugf("IDXUP(in): %s %q/%q m=%d f=%o%s v=%d (%d blocks)", nodeID, repo, f.Name, f.Modified, f.Flags, flagComment, f.Version, len(f.Blocks))
  308. }
  309. files[i] = fileFromFileInfo(f)
  310. }
  311. m.rmut.RLock()
  312. if r, ok := m.repoFiles[repo]; ok {
  313. r.Update(nodeID, files)
  314. } else {
  315. l.Fatalf("IndexUpdate for nonexistant repo %q", repo)
  316. }
  317. m.rmut.RUnlock()
  318. }
  319. func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool {
  320. m.rmut.RLock()
  321. defer m.rmut.RUnlock()
  322. for _, nrepo := range m.nodeRepos[nodeID] {
  323. if nrepo == repo {
  324. return true
  325. }
  326. }
  327. return false
  328. }
  329. func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterConfigMessage) {
  330. compErr := compareClusterConfig(m.clusterConfig(nodeID), config)
  331. if debug {
  332. l.Debugf("ClusterConfig: %s: %#v", nodeID, config)
  333. l.Debugf(" ... compare: %s: %v", nodeID, compErr)
  334. }
  335. if compErr != nil {
  336. l.Warnf("%s: %v", nodeID, compErr)
  337. m.Close(nodeID, compErr)
  338. }
  339. m.pmut.Lock()
  340. if config.ClientName == "syncthing" {
  341. m.nodeVer[nodeID] = config.ClientVersion
  342. } else {
  343. m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion
  344. }
  345. m.pmut.Unlock()
  346. l.Infof(`Node %s client is "%s %s"`, nodeID, config.ClientName, config.ClientVersion)
  347. }
  348. // Close removes the peer from the model and closes the underlying connection if possible.
  349. // Implements the protocol.Model interface.
  350. func (m *Model) Close(node protocol.NodeID, err error) {
  351. l.Infof("Connection to %s closed: %v", node, err)
  352. m.rmut.RLock()
  353. for _, repo := range m.nodeRepos[node] {
  354. m.repoFiles[repo].Replace(node, nil)
  355. }
  356. m.rmut.RUnlock()
  357. m.pmut.Lock()
  358. conn, ok := m.rawConn[node]
  359. if ok {
  360. conn.Close()
  361. }
  362. delete(m.protoConn, node)
  363. delete(m.rawConn, node)
  364. delete(m.nodeVer, node)
  365. m.pmut.Unlock()
  366. }
  367. // Request returns the specified data segment by reading it from local disk.
  368. // Implements the protocol.Model interface.
  369. func (m *Model) Request(nodeID protocol.NodeID, repo, name string, offset int64, size int) ([]byte, error) {
  370. // Verify that the requested file exists in the local model.
  371. m.rmut.RLock()
  372. r, ok := m.repoFiles[repo]
  373. m.rmut.RUnlock()
  374. if !ok {
  375. l.Warnf("Request from %s for file %s in nonexistent repo %q", nodeID, name, repo)
  376. return nil, ErrNoSuchFile
  377. }
  378. lf := r.Get(protocol.LocalNodeID, name)
  379. if lf.Suppressed || protocol.IsDeleted(lf.Flags) {
  380. if debug {
  381. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d; invalid: %v", nodeID, repo, name, offset, size, lf)
  382. }
  383. return nil, ErrInvalid
  384. }
  385. if offset > lf.Size {
  386. if debug {
  387. l.Debugf("REQ(in; nonexistent): %s: %q o=%d s=%d", nodeID, name, offset, size)
  388. }
  389. return nil, ErrNoSuchFile
  390. }
  391. if debug && nodeID != protocol.LocalNodeID {
  392. l.Debugf("REQ(in): %s: %q / %q o=%d s=%d", nodeID, repo, name, offset, size)
  393. }
  394. m.rmut.RLock()
  395. fn := filepath.Join(m.repoCfgs[repo].Directory, name)
  396. m.rmut.RUnlock()
  397. fd, err := os.Open(fn) // XXX: Inefficient, should cache fd?
  398. if err != nil {
  399. return nil, err
  400. }
  401. defer fd.Close()
  402. buf := make([]byte, size)
  403. _, err = fd.ReadAt(buf, offset)
  404. if err != nil {
  405. return nil, err
  406. }
  407. return buf, nil
  408. }
  409. // ReplaceLocal replaces the local repository index with the given list of files.
  410. func (m *Model) ReplaceLocal(repo string, fs []scanner.File) {
  411. m.rmut.RLock()
  412. m.repoFiles[repo].ReplaceWithDelete(protocol.LocalNodeID, fs)
  413. m.rmut.RUnlock()
  414. }
  415. func (m *Model) CurrentRepoFile(repo string, file string) scanner.File {
  416. m.rmut.RLock()
  417. f := m.repoFiles[repo].Get(protocol.LocalNodeID, file)
  418. m.rmut.RUnlock()
  419. return f
  420. }
  421. func (m *Model) CurrentGlobalFile(repo string, file string) scanner.File {
  422. m.rmut.RLock()
  423. f := m.repoFiles[repo].GetGlobal(file)
  424. m.rmut.RUnlock()
  425. return f
  426. }
  427. type cFiler struct {
  428. m *Model
  429. r string
  430. }
  431. // Implements scanner.CurrentFiler
  432. func (cf cFiler) CurrentFile(file string) scanner.File {
  433. return cf.m.CurrentRepoFile(cf.r, file)
  434. }
  435. // ConnectedTo returns true if we are connected to the named node.
  436. func (m *Model) ConnectedTo(nodeID protocol.NodeID) bool {
  437. m.pmut.RLock()
  438. _, ok := m.protoConn[nodeID]
  439. m.pmut.RUnlock()
  440. return ok
  441. }
  442. // AddConnection adds a new peer connection to the model. An initial index will
  443. // be sent to the connected peer, thereafter index updates whenever the local
  444. // repository changes.
  445. func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) {
  446. nodeID := protoConn.ID()
  447. m.pmut.Lock()
  448. if _, ok := m.protoConn[nodeID]; ok {
  449. panic("add existing node")
  450. }
  451. m.protoConn[nodeID] = protoConn
  452. if _, ok := m.rawConn[nodeID]; ok {
  453. panic("add existing node")
  454. }
  455. m.rawConn[nodeID] = rawConn
  456. m.pmut.Unlock()
  457. cm := m.clusterConfig(nodeID)
  458. protoConn.ClusterConfig(cm)
  459. var idxToSend = make(map[string][]protocol.FileInfo)
  460. m.rmut.RLock()
  461. for _, repo := range m.nodeRepos[nodeID] {
  462. idxToSend[repo] = m.protocolIndex(repo)
  463. }
  464. m.rmut.RUnlock()
  465. go func() {
  466. for repo, idx := range idxToSend {
  467. if debug {
  468. l.Debugf("IDX(out/initial): %s: %q: %d files", nodeID, repo, len(idx))
  469. }
  470. const batchSize = 1000
  471. for i := 0; i < len(idx); i += batchSize {
  472. if len(idx[i:]) < batchSize {
  473. protoConn.Index(repo, idx[i:])
  474. } else {
  475. protoConn.Index(repo, idx[i:i+batchSize])
  476. }
  477. }
  478. }
  479. }()
  480. }
  481. // protocolIndex returns the current local index in protocol data types.
  482. func (m *Model) protocolIndex(repo string) []protocol.FileInfo {
  483. var index []protocol.FileInfo
  484. var fs []scanner.File
  485. m.repoFiles[repo].WithHave(protocol.LocalNodeID, func(f scanner.File) bool {
  486. fs = append(fs, f)
  487. return true
  488. })
  489. for _, f := range fs {
  490. mf := fileInfoFromFile(f)
  491. if debug {
  492. var flagComment string
  493. if protocol.IsDeleted(mf.Flags) {
  494. flagComment = " (deleted)"
  495. }
  496. l.Debugf("IDX(out): %q/%q m=%d f=%o%s v=%d (%d blocks)", repo, mf.Name, mf.Modified, mf.Flags, flagComment, mf.Version, len(mf.Blocks))
  497. }
  498. index = append(index, mf)
  499. }
  500. return index
  501. }
  502. func (m *Model) updateLocal(repo string, f scanner.File) {
  503. m.rmut.RLock()
  504. m.repoFiles[repo].Update(protocol.LocalNodeID, []scanner.File{f})
  505. m.rmut.RUnlock()
  506. }
  507. func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) {
  508. m.pmut.RLock()
  509. nc, ok := m.protoConn[nodeID]
  510. m.pmut.RUnlock()
  511. if !ok {
  512. return nil, fmt.Errorf("requestGlobal: no such node: %s", nodeID)
  513. }
  514. if debug {
  515. l.Debugf("REQ(out): %s: %q / %q o=%d s=%d h=%x", nodeID, repo, name, offset, size, hash)
  516. }
  517. return nc.Request(repo, name, offset, size)
  518. }
  519. func (m *Model) broadcastIndexLoop() {
  520. // TODO: Rewrite to send index in segments
  521. var lastChange = map[string]uint64{}
  522. for {
  523. time.Sleep(5 * time.Second)
  524. m.pmut.RLock()
  525. m.rmut.RLock()
  526. var indexWg sync.WaitGroup
  527. for repo, fs := range m.repoFiles {
  528. repo := repo
  529. c := fs.Changes(protocol.LocalNodeID)
  530. if c == lastChange[repo] {
  531. continue
  532. }
  533. lastChange[repo] = c
  534. idx := m.protocolIndex(repo)
  535. for _, nodeID := range m.repoNodes[repo] {
  536. nodeID := nodeID
  537. if conn, ok := m.protoConn[nodeID]; ok {
  538. indexWg.Add(1)
  539. if debug {
  540. l.Debugf("IDX(out/loop): %s: %d files", nodeID, len(idx))
  541. }
  542. go func() {
  543. conn.Index(repo, idx)
  544. indexWg.Done()
  545. }()
  546. }
  547. }
  548. }
  549. m.rmut.RUnlock()
  550. m.pmut.RUnlock()
  551. indexWg.Wait()
  552. }
  553. }
  554. func (m *Model) AddRepo(cfg config.RepositoryConfiguration) {
  555. if m.started {
  556. panic("cannot add repo to started model")
  557. }
  558. if len(cfg.ID) == 0 {
  559. panic("cannot add empty repo id")
  560. }
  561. m.rmut.Lock()
  562. m.repoCfgs[cfg.ID] = cfg
  563. m.repoFiles[cfg.ID] = files.NewSet(cfg.ID, m.db)
  564. m.suppressor[cfg.ID] = &suppressor{threshold: int64(m.cfg.Options.MaxChangeKbps)}
  565. m.repoNodes[cfg.ID] = make([]protocol.NodeID, len(cfg.Nodes))
  566. for i, node := range cfg.Nodes {
  567. m.repoNodes[cfg.ID][i] = node.NodeID
  568. m.nodeRepos[node.NodeID] = append(m.nodeRepos[node.NodeID], cfg.ID)
  569. }
  570. m.addedRepo = true
  571. m.rmut.Unlock()
  572. }
  573. func (m *Model) ScanRepos() {
  574. m.rmut.RLock()
  575. var repos = make([]string, 0, len(m.repoCfgs))
  576. for repo := range m.repoCfgs {
  577. repos = append(repos, repo)
  578. }
  579. m.rmut.RUnlock()
  580. var wg sync.WaitGroup
  581. wg.Add(len(repos))
  582. for _, repo := range repos {
  583. repo := repo
  584. go func() {
  585. err := m.ScanRepo(repo)
  586. if err != nil {
  587. invalidateRepo(m.cfg, repo, err)
  588. }
  589. wg.Done()
  590. }()
  591. }
  592. wg.Wait()
  593. }
  594. func (m *Model) CleanRepos() {
  595. m.rmut.RLock()
  596. var dirs = make([]string, 0, len(m.repoCfgs))
  597. for _, cfg := range m.repoCfgs {
  598. dirs = append(dirs, cfg.Directory)
  599. }
  600. m.rmut.RUnlock()
  601. var wg sync.WaitGroup
  602. wg.Add(len(dirs))
  603. for _, dir := range dirs {
  604. w := &scanner.Walker{
  605. Dir: dir,
  606. TempNamer: defTempNamer,
  607. }
  608. go func() {
  609. w.CleanTempFiles()
  610. wg.Done()
  611. }()
  612. }
  613. wg.Wait()
  614. }
  615. func (m *Model) ScanRepo(repo string) error {
  616. m.rmut.RLock()
  617. w := &scanner.Walker{
  618. Dir: m.repoCfgs[repo].Directory,
  619. IgnoreFile: ".stignore",
  620. BlockSize: scanner.StandardBlockSize,
  621. TempNamer: defTempNamer,
  622. Suppressor: m.suppressor[repo],
  623. CurrentFiler: cFiler{m, repo},
  624. IgnorePerms: m.repoCfgs[repo].IgnorePerms,
  625. }
  626. m.rmut.RUnlock()
  627. m.setState(repo, RepoScanning)
  628. fs, _, err := w.Walk()
  629. if err != nil {
  630. return err
  631. }
  632. m.ReplaceLocal(repo, fs)
  633. m.setState(repo, RepoIdle)
  634. return nil
  635. }
  636. func (m *Model) LoadIndexes(dir string) {
  637. m.rmut.RLock()
  638. for repo := range m.repoCfgs {
  639. fs := m.loadIndex(repo, dir)
  640. var sfs = make([]scanner.File, len(fs))
  641. for i := 0; i < len(fs); i++ {
  642. lamport.Default.Tick(fs[i].Version)
  643. sfs[i] = fileFromFileInfo(fs[i])
  644. sfs[i].Suppressed = false // we might have saved an index with files that were suppressed; the should not be on startup
  645. }
  646. m.repoFiles[repo].Replace(protocol.LocalNodeID, sfs)
  647. }
  648. m.rmut.RUnlock()
  649. }
  650. func (m *Model) saveIndex(repo string, dir string, fs []protocol.FileInfo) error {
  651. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  652. name := id + ".idx.gz"
  653. name = filepath.Join(dir, name)
  654. tmp := fmt.Sprintf("%s.tmp.%d", name, time.Now().UnixNano())
  655. idxf, err := os.OpenFile(tmp, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0644)
  656. if err != nil {
  657. return err
  658. }
  659. defer os.Remove(tmp)
  660. gzw := gzip.NewWriter(idxf)
  661. n, err := protocol.IndexMessage{
  662. Repository: repo,
  663. Files: fs,
  664. }.EncodeXDR(gzw)
  665. if err != nil {
  666. gzw.Close()
  667. idxf.Close()
  668. return err
  669. }
  670. err = gzw.Close()
  671. if err != nil {
  672. return err
  673. }
  674. err = idxf.Close()
  675. if err != nil {
  676. return err
  677. }
  678. if debug {
  679. l.Debugln("wrote index,", n, "bytes uncompressed")
  680. }
  681. return osutil.Rename(tmp, name)
  682. }
  683. func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo {
  684. id := fmt.Sprintf("%x", sha1.Sum([]byte(m.repoCfgs[repo].Directory)))
  685. name := id + ".idx.gz"
  686. name = filepath.Join(dir, name)
  687. idxf, err := os.Open(name)
  688. if err != nil {
  689. return nil
  690. }
  691. defer idxf.Close()
  692. gzr, err := gzip.NewReader(idxf)
  693. if err != nil {
  694. return nil
  695. }
  696. defer gzr.Close()
  697. var im protocol.IndexMessage
  698. err = im.DecodeXDR(gzr)
  699. if err != nil || im.Repository != repo {
  700. return nil
  701. }
  702. return im.Files
  703. }
  704. // clusterConfig returns a ClusterConfigMessage that is correct for the given peer node
  705. func (m *Model) clusterConfig(node protocol.NodeID) protocol.ClusterConfigMessage {
  706. cm := protocol.ClusterConfigMessage{
  707. ClientName: m.clientName,
  708. ClientVersion: m.clientVersion,
  709. }
  710. m.rmut.RLock()
  711. for _, repo := range m.nodeRepos[node] {
  712. cr := protocol.Repository{
  713. ID: repo,
  714. }
  715. for _, node := range m.repoNodes[repo] {
  716. // TODO: Set read only bit when relevant
  717. cr.Nodes = append(cr.Nodes, protocol.Node{
  718. ID: node[:],
  719. Flags: protocol.FlagShareTrusted,
  720. })
  721. }
  722. cm.Repositories = append(cm.Repositories, cr)
  723. }
  724. m.rmut.RUnlock()
  725. return cm
  726. }
  727. func (m *Model) setState(repo string, state repoState) {
  728. m.smut.Lock()
  729. m.repoState[repo] = state
  730. m.smut.Unlock()
  731. }
  732. func (m *Model) State(repo string) string {
  733. m.smut.RLock()
  734. state := m.repoState[repo]
  735. m.smut.RUnlock()
  736. switch state {
  737. case RepoIdle:
  738. return "idle"
  739. case RepoScanning:
  740. return "scanning"
  741. case RepoCleaning:
  742. return "cleaning"
  743. case RepoSyncing:
  744. return "syncing"
  745. default:
  746. return "unknown"
  747. }
  748. }
  749. func (m *Model) Override(repo string) {
  750. fs := m.NeedFilesRepo(repo)
  751. m.rmut.RLock()
  752. r := m.repoFiles[repo]
  753. m.rmut.RUnlock()
  754. for i := range fs {
  755. f := &fs[i]
  756. h := r.Get(protocol.LocalNodeID, f.Name)
  757. if h.Name != f.Name {
  758. // We are missing the file
  759. f.Flags |= protocol.FlagDeleted
  760. f.Blocks = nil
  761. } else {
  762. // We have the file, replace with our version
  763. *f = h
  764. }
  765. f.Version = lamport.Default.Tick(f.Version)
  766. }
  767. r.Update(protocol.LocalNodeID, fs)
  768. }
  769. // Version returns the change version for the given repository. This is
  770. // guaranteed to increment if the contents of the local or global repository
  771. // has changed.
  772. func (m *Model) Version(repo string) uint64 {
  773. var ver uint64
  774. m.rmut.Lock()
  775. for _, n := range m.repoNodes[repo] {
  776. ver += m.repoFiles[repo].Changes(n)
  777. }
  778. m.rmut.Unlock()
  779. return ver
  780. }