rwfolder.go 39 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "os"
  13. "path/filepath"
  14. "time"
  15. "github.com/syncthing/protocol"
  16. "github.com/syncthing/syncthing/internal/config"
  17. "github.com/syncthing/syncthing/internal/db"
  18. "github.com/syncthing/syncthing/internal/events"
  19. "github.com/syncthing/syncthing/internal/ignore"
  20. "github.com/syncthing/syncthing/internal/osutil"
  21. "github.com/syncthing/syncthing/internal/scanner"
  22. "github.com/syncthing/syncthing/internal/symlinks"
  23. "github.com/syncthing/syncthing/internal/sync"
  24. "github.com/syncthing/syncthing/internal/versioner"
  25. )
  26. // TODO: Stop on errors
  27. const (
  28. pauseIntv = 60 * time.Second
  29. nextPullIntv = 10 * time.Second
  30. shortPullIntv = time.Second
  31. )
  32. // A pullBlockState is passed to the puller routine for each block that needs
  33. // to be fetched.
  34. type pullBlockState struct {
  35. *sharedPullerState
  36. block protocol.BlockInfo
  37. }
  38. // A copyBlocksState is passed to copy routine if the file has blocks to be
  39. // copied.
  40. type copyBlocksState struct {
  41. *sharedPullerState
  42. blocks []protocol.BlockInfo
  43. }
  44. var (
  45. activity = newDeviceActivity()
  46. errNoDevice = errors.New("no available source device")
  47. )
  48. const (
  49. dbUpdateHandleDir = iota
  50. dbUpdateDeleteDir
  51. dbUpdateHandleFile
  52. dbUpdateDeleteFile
  53. dbUpdateShortcutFile
  54. )
  55. type dbUpdateJob struct {
  56. file protocol.FileInfo
  57. jobType int
  58. }
  59. type rwFolder struct {
  60. stateTracker
  61. model *Model
  62. progressEmitter *ProgressEmitter
  63. virtualMtimeRepo *db.VirtualMtimeRepo
  64. folder string
  65. dir string
  66. scanIntv time.Duration
  67. versioner versioner.Versioner
  68. ignorePerms bool
  69. copiers int
  70. pullers int
  71. shortID uint64
  72. order config.PullOrder
  73. stop chan struct{}
  74. queue *jobQueue
  75. dbUpdates chan dbUpdateJob
  76. scanTimer *time.Timer
  77. pullTimer *time.Timer
  78. delayScan chan time.Duration
  79. scanNow chan rescanRequest
  80. remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
  81. }
  82. func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
  83. return &rwFolder{
  84. stateTracker: stateTracker{
  85. folder: cfg.ID,
  86. mut: sync.NewMutex(),
  87. },
  88. model: m,
  89. progressEmitter: m.progressEmitter,
  90. virtualMtimeRepo: db.NewVirtualMtimeRepo(m.db, cfg.ID),
  91. folder: cfg.ID,
  92. dir: cfg.Path(),
  93. scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
  94. ignorePerms: cfg.IgnorePerms,
  95. copiers: cfg.Copiers,
  96. pullers: cfg.Pullers,
  97. shortID: shortID,
  98. order: cfg.Order,
  99. stop: make(chan struct{}),
  100. queue: newJobQueue(),
  101. pullTimer: time.NewTimer(shortPullIntv),
  102. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
  103. delayScan: make(chan time.Duration),
  104. scanNow: make(chan rescanRequest),
  105. remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
  106. }
  107. }
  108. // Helper function to check whether either the ignorePerm flag has been
  109. // set on the local host or the FlagNoPermBits has been set on the file/dir
  110. // which is being pulled.
  111. func (p *rwFolder) ignorePermissions(file protocol.FileInfo) bool {
  112. return p.ignorePerms || file.Flags&protocol.FlagNoPermBits != 0
  113. }
  114. // Serve will run scans and pulls. It will return when Stop()ed or on a
  115. // critical error.
  116. func (p *rwFolder) Serve() {
  117. if debug {
  118. l.Debugln(p, "starting")
  119. defer l.Debugln(p, "exiting")
  120. }
  121. defer func() {
  122. p.pullTimer.Stop()
  123. p.scanTimer.Stop()
  124. // TODO: Should there be an actual FolderStopped state?
  125. p.setState(FolderIdle)
  126. }()
  127. var prevVer int64
  128. var prevIgnoreHash string
  129. rescheduleScan := func() {
  130. if p.scanIntv == 0 {
  131. // We should not run scans, so it should not be rescheduled.
  132. return
  133. }
  134. // Sleep a random time between 3/4 and 5/4 of the configured interval.
  135. sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
  136. intv := time.Duration(sleepNanos) * time.Nanosecond
  137. if debug {
  138. l.Debugln(p, "next rescan in", intv)
  139. }
  140. p.scanTimer.Reset(intv)
  141. }
  142. // We don't start pulling files until a scan has been completed.
  143. initialScanCompleted := false
  144. for {
  145. select {
  146. case <-p.stop:
  147. return
  148. case <-p.remoteIndex:
  149. prevVer = 0
  150. p.pullTimer.Reset(shortPullIntv)
  151. if debug {
  152. l.Debugln(p, "remote index updated, rescheduling pull")
  153. }
  154. case <-p.pullTimer.C:
  155. if !initialScanCompleted {
  156. if debug {
  157. l.Debugln(p, "skip (initial)")
  158. }
  159. p.pullTimer.Reset(nextPullIntv)
  160. continue
  161. }
  162. p.model.fmut.RLock()
  163. curIgnores := p.model.folderIgnores[p.folder]
  164. p.model.fmut.RUnlock()
  165. if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
  166. // The ignore patterns have changed. We need to re-evaluate if
  167. // there are files we need now that were ignored before.
  168. if debug {
  169. l.Debugln(p, "ignore patterns have changed, resetting prevVer")
  170. }
  171. prevVer = 0
  172. prevIgnoreHash = newHash
  173. }
  174. // RemoteLocalVersion() is a fast call, doesn't touch the database.
  175. curVer := p.model.RemoteLocalVersion(p.folder)
  176. if curVer == prevVer {
  177. if debug {
  178. l.Debugln(p, "skip (curVer == prevVer)", prevVer)
  179. }
  180. p.pullTimer.Reset(nextPullIntv)
  181. continue
  182. }
  183. if debug {
  184. l.Debugln(p, "pulling", prevVer, curVer)
  185. }
  186. p.setState(FolderSyncing)
  187. tries := 0
  188. for {
  189. tries++
  190. changed := p.pullerIteration(curIgnores)
  191. if debug {
  192. l.Debugln(p, "changed", changed)
  193. }
  194. if changed == 0 {
  195. // No files were changed by the puller, so we are in
  196. // sync. Remember the local version number and
  197. // schedule a resync a little bit into the future.
  198. if lv := p.model.RemoteLocalVersion(p.folder); lv < curVer {
  199. // There's a corner case where the device we needed
  200. // files from disconnected during the puller
  201. // iteration. The files will have been removed from
  202. // the index, so we've concluded that we don't need
  203. // them, but at the same time we have the local
  204. // version that includes those files in curVer. So we
  205. // catch the case that localVersion might have
  206. // decreased here.
  207. l.Debugln(p, "adjusting curVer", lv)
  208. curVer = lv
  209. }
  210. prevVer = curVer
  211. if debug {
  212. l.Debugln(p, "next pull in", nextPullIntv)
  213. }
  214. p.pullTimer.Reset(nextPullIntv)
  215. break
  216. }
  217. if tries > 10 {
  218. // We've tried a bunch of times to get in sync, but
  219. // we're not making it. Probably there are write
  220. // errors preventing us. Flag this with a warning and
  221. // wait a bit longer before retrying.
  222. l.Warnf("Folder %q isn't making progress - check logs for possible root cause. Pausing puller for %v.", p.folder, pauseIntv)
  223. if debug {
  224. l.Debugln(p, "next pull in", pauseIntv)
  225. }
  226. p.pullTimer.Reset(pauseIntv)
  227. break
  228. }
  229. }
  230. p.setState(FolderIdle)
  231. // The reason for running the scanner from within the puller is that
  232. // this is the easiest way to make sure we are not doing both at the
  233. // same time.
  234. case <-p.scanTimer.C:
  235. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  236. l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
  237. rescheduleScan()
  238. continue
  239. }
  240. if debug {
  241. l.Debugln(p, "rescan")
  242. }
  243. if err := p.model.internalScanFolderSubs(p.folder, nil); err != nil {
  244. // Potentially sets the error twice, once in the scanner just
  245. // by doing a check, and once here, if the error returned is
  246. // the same one as returned by CheckFolderHealth, though
  247. // duplicate set is handled by setError.
  248. p.setError(err)
  249. rescheduleScan()
  250. continue
  251. }
  252. if p.scanIntv > 0 {
  253. rescheduleScan()
  254. }
  255. if !initialScanCompleted {
  256. l.Infoln("Completed initial scan (rw) of folder", p.folder)
  257. initialScanCompleted = true
  258. }
  259. case req := <-p.scanNow:
  260. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  261. l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
  262. req.err <- err
  263. continue
  264. }
  265. if debug {
  266. l.Debugln(p, "forced rescan")
  267. }
  268. if err := p.model.internalScanFolderSubs(p.folder, req.subs); err != nil {
  269. // Potentially sets the error twice, once in the scanner just
  270. // by doing a check, and once here, if the error returned is
  271. // the same one as returned by CheckFolderHealth, though
  272. // duplicate set is handled by setError.
  273. p.setError(err)
  274. req.err <- err
  275. continue
  276. }
  277. req.err <- nil
  278. case next := <-p.delayScan:
  279. p.scanTimer.Reset(next)
  280. }
  281. }
  282. }
  283. func (p *rwFolder) Stop() {
  284. close(p.stop)
  285. }
  286. func (p *rwFolder) IndexUpdated() {
  287. select {
  288. case p.remoteIndex <- struct{}{}:
  289. default:
  290. // We might be busy doing a pull and thus not reading from this
  291. // channel. The channel is 1-buffered, so one notification will be
  292. // queued to ensure we recheck after the pull, but beyond that we must
  293. // make sure to not block index receiving.
  294. }
  295. }
  296. func (p *rwFolder) Scan(subs []string) error {
  297. req := rescanRequest{
  298. subs: subs,
  299. err: make(chan error),
  300. }
  301. p.scanNow <- req
  302. return <-req.err
  303. }
  304. func (p *rwFolder) String() string {
  305. return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
  306. }
  307. // pullerIteration runs a single puller iteration for the given folder and
  308. // returns the number items that should have been synced (even those that
  309. // might have failed). One puller iteration handles all files currently
  310. // flagged as needed in the folder.
  311. func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
  312. pullChan := make(chan pullBlockState)
  313. copyChan := make(chan copyBlocksState)
  314. finisherChan := make(chan *sharedPullerState)
  315. updateWg := sync.NewWaitGroup()
  316. copyWg := sync.NewWaitGroup()
  317. pullWg := sync.NewWaitGroup()
  318. doneWg := sync.NewWaitGroup()
  319. if debug {
  320. l.Debugln(p, "c", p.copiers, "p", p.pullers)
  321. }
  322. p.dbUpdates = make(chan dbUpdateJob)
  323. updateWg.Add(1)
  324. go func() {
  325. // dbUpdaterRoutine finishes when p.dbUpdates is closed
  326. p.dbUpdaterRoutine()
  327. updateWg.Done()
  328. }()
  329. for i := 0; i < p.copiers; i++ {
  330. copyWg.Add(1)
  331. go func() {
  332. // copierRoutine finishes when copyChan is closed
  333. p.copierRoutine(copyChan, pullChan, finisherChan)
  334. copyWg.Done()
  335. }()
  336. }
  337. for i := 0; i < p.pullers; i++ {
  338. pullWg.Add(1)
  339. go func() {
  340. // pullerRoutine finishes when pullChan is closed
  341. p.pullerRoutine(pullChan, finisherChan)
  342. pullWg.Done()
  343. }()
  344. }
  345. doneWg.Add(1)
  346. // finisherRoutine finishes when finisherChan is closed
  347. go func() {
  348. p.finisherRoutine(finisherChan)
  349. doneWg.Done()
  350. }()
  351. p.model.fmut.RLock()
  352. folderFiles := p.model.folderFiles[p.folder]
  353. p.model.fmut.RUnlock()
  354. // !!!
  355. // WithNeed takes a database snapshot (by necessity). By the time we've
  356. // handled a bunch of files it might have become out of date and we might
  357. // be attempting to sync with an old version of a file...
  358. // !!!
  359. changed := 0
  360. fileDeletions := map[string]protocol.FileInfo{}
  361. dirDeletions := []protocol.FileInfo{}
  362. buckets := map[string][]protocol.FileInfo{}
  363. folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
  364. // Needed items are delivered sorted lexicographically. We'll handle
  365. // directories as they come along, so parents before children. Files
  366. // are queued and the order may be changed later.
  367. file := intf.(protocol.FileInfo)
  368. if ignores.Match(file.Name) {
  369. // This is an ignored file. Skip it, continue iteration.
  370. return true
  371. }
  372. if debug {
  373. l.Debugln(p, "handling", file.Name)
  374. }
  375. switch {
  376. case file.IsDeleted():
  377. // A deleted file, directory or symlink
  378. if file.IsDirectory() {
  379. dirDeletions = append(dirDeletions, file)
  380. } else {
  381. fileDeletions[file.Name] = file
  382. df, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  383. // Local file can be already deleted, but with a lower version
  384. // number, hence the deletion coming in again as part of
  385. // WithNeed, furthermore, the file can simply be of the wrong
  386. // type if we haven't yet managed to pull it.
  387. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
  388. // Put files into buckets per first hash
  389. key := string(df.Blocks[0].Hash)
  390. buckets[key] = append(buckets[key], df)
  391. }
  392. }
  393. case file.IsDirectory() && !file.IsSymlink():
  394. // A new or changed directory
  395. if debug {
  396. l.Debugln("Creating directory", file.Name)
  397. }
  398. p.handleDir(file)
  399. default:
  400. // A new or changed file or symlink. This is the only case where we
  401. // do stuff concurrently in the background
  402. p.queue.Push(file.Name, file.Size(), file.Modified)
  403. }
  404. changed++
  405. return true
  406. })
  407. // Reorder the file queue according to configuration
  408. switch p.order {
  409. case config.OrderRandom:
  410. p.queue.Shuffle()
  411. case config.OrderAlphabetic:
  412. // The queue is already in alphabetic order.
  413. case config.OrderSmallestFirst:
  414. p.queue.SortSmallestFirst()
  415. case config.OrderLargestFirst:
  416. p.queue.SortLargestFirst()
  417. case config.OrderOldestFirst:
  418. p.queue.SortOldestFirst()
  419. case config.OrderNewestFirst:
  420. p.queue.SortOldestFirst()
  421. }
  422. // Process the file queue
  423. nextFile:
  424. for {
  425. fileName, ok := p.queue.Pop()
  426. if !ok {
  427. break
  428. }
  429. f, ok := p.model.CurrentGlobalFile(p.folder, fileName)
  430. if !ok {
  431. // File is no longer in the index. Mark it as done and drop it.
  432. p.queue.Done(fileName)
  433. continue
  434. }
  435. // Local file can be already deleted, but with a lower version
  436. // number, hence the deletion coming in again as part of
  437. // WithNeed, furthermore, the file can simply be of the wrong type if
  438. // the global index changed while we were processing this iteration.
  439. if !f.IsDeleted() && !f.IsSymlink() && !f.IsDirectory() {
  440. key := string(f.Blocks[0].Hash)
  441. for i, candidate := range buckets[key] {
  442. if scanner.BlocksEqual(candidate.Blocks, f.Blocks) {
  443. // Remove the candidate from the bucket
  444. lidx := len(buckets[key]) - 1
  445. buckets[key][i] = buckets[key][lidx]
  446. buckets[key] = buckets[key][:lidx]
  447. // candidate is our current state of the file, where as the
  448. // desired state with the delete bit set is in the deletion
  449. // map.
  450. desired := fileDeletions[candidate.Name]
  451. // Remove the pending deletion (as we perform it by renaming)
  452. delete(fileDeletions, candidate.Name)
  453. p.renameFile(desired, f)
  454. p.queue.Done(fileName)
  455. continue nextFile
  456. }
  457. }
  458. }
  459. // Not a rename or a symlink, deal with it.
  460. p.handleFile(f, copyChan, finisherChan)
  461. }
  462. // Signal copy and puller routines that we are done with the in data for
  463. // this iteration. Wait for them to finish.
  464. close(copyChan)
  465. copyWg.Wait()
  466. close(pullChan)
  467. pullWg.Wait()
  468. // Signal the finisher chan that there will be no more input.
  469. close(finisherChan)
  470. // Wait for the finisherChan to finish.
  471. doneWg.Wait()
  472. for _, file := range fileDeletions {
  473. if debug {
  474. l.Debugln("Deleting file", file.Name)
  475. }
  476. p.deleteFile(file)
  477. }
  478. for i := range dirDeletions {
  479. dir := dirDeletions[len(dirDeletions)-i-1]
  480. if debug {
  481. l.Debugln("Deleting dir", dir.Name)
  482. }
  483. p.deleteDir(dir)
  484. }
  485. // Wait for db updates to complete
  486. close(p.dbUpdates)
  487. updateWg.Wait()
  488. return changed
  489. }
  490. // handleDir creates or updates the given directory
  491. func (p *rwFolder) handleDir(file protocol.FileInfo) {
  492. var err error
  493. events.Default.Log(events.ItemStarted, map[string]string{
  494. "folder": p.folder,
  495. "item": file.Name,
  496. "type": "dir",
  497. "action": "update",
  498. })
  499. defer func() {
  500. events.Default.Log(events.ItemFinished, map[string]interface{}{
  501. "folder": p.folder,
  502. "item": file.Name,
  503. "error": events.Error(err),
  504. "type": "dir",
  505. "action": "update",
  506. })
  507. }()
  508. realName := filepath.Join(p.dir, file.Name)
  509. mode := os.FileMode(file.Flags & 0777)
  510. if p.ignorePermissions(file) {
  511. mode = 0777
  512. }
  513. if debug {
  514. curFile, _ := p.model.CurrentFolderFile(p.folder, file.Name)
  515. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  516. }
  517. info, err := osutil.Lstat(realName)
  518. switch {
  519. // There is already something under that name, but it's a file/link.
  520. // Most likely a file/link is getting replaced with a directory.
  521. // Remove the file/link and fall through to directory creation.
  522. case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
  523. err = osutil.InWritableDir(osutil.Remove, realName)
  524. if err != nil {
  525. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  526. return
  527. }
  528. fallthrough
  529. // The directory doesn't exist, so we create it with the right
  530. // mode bits from the start.
  531. case err != nil && os.IsNotExist(err):
  532. // We declare a function that acts on only the path name, so
  533. // we can pass it to InWritableDir. We use a regular Mkdir and
  534. // not MkdirAll because the parent should already exist.
  535. mkdir := func(path string) error {
  536. err = os.Mkdir(path, mode)
  537. if err != nil || p.ignorePermissions(file) {
  538. return err
  539. }
  540. return os.Chmod(path, mode)
  541. }
  542. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  543. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  544. } else {
  545. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  546. }
  547. return
  548. // Weird error when stat()'ing the dir. Probably won't work to do
  549. // anything else with it if we can't even stat() it.
  550. case err != nil:
  551. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  552. return
  553. }
  554. // The directory already exists, so we just correct the mode bits. (We
  555. // don't handle modification times on directories, because that sucks...)
  556. // It's OK to change mode bits on stuff within non-writable directories.
  557. if p.ignorePermissions(file) {
  558. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  559. } else if err := os.Chmod(realName, mode); err == nil {
  560. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  561. } else {
  562. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  563. }
  564. }
  565. // deleteDir attempts to delete the given directory
  566. func (p *rwFolder) deleteDir(file protocol.FileInfo) {
  567. var err error
  568. events.Default.Log(events.ItemStarted, map[string]string{
  569. "folder": p.folder,
  570. "item": file.Name,
  571. "type": "dir",
  572. "action": "delete",
  573. })
  574. defer func() {
  575. events.Default.Log(events.ItemFinished, map[string]interface{}{
  576. "folder": p.folder,
  577. "item": file.Name,
  578. "error": events.Error(err),
  579. "type": "dir",
  580. "action": "delete",
  581. })
  582. }()
  583. realName := filepath.Join(p.dir, file.Name)
  584. // Delete any temporary files lying around in the directory
  585. dir, _ := os.Open(realName)
  586. if dir != nil {
  587. files, _ := dir.Readdirnames(-1)
  588. for _, file := range files {
  589. if defTempNamer.IsTemporary(file) {
  590. osutil.InWritableDir(osutil.Remove, filepath.Join(realName, file))
  591. }
  592. }
  593. }
  594. err = osutil.InWritableDir(osutil.Remove, realName)
  595. if err == nil || os.IsNotExist(err) {
  596. // It was removed or it doesn't exist to start with
  597. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  598. } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  599. // We get an error just looking at the directory, and it's not a
  600. // permission problem. Lets assume the error is in fact some variant
  601. // of "file does not exist" (possibly expressed as some parent being a
  602. // file and not a directory etc) and that the delete is handled.
  603. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  604. } else {
  605. l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
  606. }
  607. }
  608. // deleteFile attempts to delete the given file
  609. func (p *rwFolder) deleteFile(file protocol.FileInfo) {
  610. var err error
  611. events.Default.Log(events.ItemStarted, map[string]string{
  612. "folder": p.folder,
  613. "item": file.Name,
  614. "type": "file",
  615. "action": "delete",
  616. })
  617. defer func() {
  618. events.Default.Log(events.ItemFinished, map[string]interface{}{
  619. "folder": p.folder,
  620. "item": file.Name,
  621. "error": events.Error(err),
  622. "type": "file",
  623. "action": "delete",
  624. })
  625. }()
  626. realName := filepath.Join(p.dir, file.Name)
  627. cur, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  628. if ok && p.inConflict(cur.Version, file.Version) {
  629. // There is a conflict here. Move the file to a conflict copy instead
  630. // of deleting. Also merge with the version vector we had, to indicate
  631. // we have resolved the conflict.
  632. file.Version = file.Version.Merge(cur.Version)
  633. err = osutil.InWritableDir(moveForConflict, realName)
  634. } else if p.versioner != nil {
  635. err = osutil.InWritableDir(p.versioner.Archive, realName)
  636. } else {
  637. err = osutil.InWritableDir(osutil.Remove, realName)
  638. }
  639. if err == nil || os.IsNotExist(err) {
  640. // It was removed or it doesn't exist to start with
  641. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  642. } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  643. // We get an error just looking at the file, and it's not a permission
  644. // problem. Lets assume the error is in fact some variant of "file
  645. // does not exist" (possibly expressed as some parent being a file and
  646. // not a directory etc) and that the delete is handled.
  647. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  648. } else {
  649. l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
  650. }
  651. }
  652. // renameFile attempts to rename an existing file to a destination
  653. // and set the right attributes on it.
  654. func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
  655. var err error
  656. events.Default.Log(events.ItemStarted, map[string]string{
  657. "folder": p.folder,
  658. "item": source.Name,
  659. "type": "file",
  660. "action": "delete",
  661. })
  662. events.Default.Log(events.ItemStarted, map[string]string{
  663. "folder": p.folder,
  664. "item": target.Name,
  665. "type": "file",
  666. "action": "update",
  667. })
  668. defer func() {
  669. events.Default.Log(events.ItemFinished, map[string]interface{}{
  670. "folder": p.folder,
  671. "item": source.Name,
  672. "error": events.Error(err),
  673. "type": "file",
  674. "action": "delete",
  675. })
  676. events.Default.Log(events.ItemFinished, map[string]interface{}{
  677. "folder": p.folder,
  678. "item": target.Name,
  679. "error": events.Error(err),
  680. "type": "file",
  681. "action": "update",
  682. })
  683. }()
  684. if debug {
  685. l.Debugln(p, "taking rename shortcut", source.Name, "->", target.Name)
  686. }
  687. from := filepath.Join(p.dir, source.Name)
  688. to := filepath.Join(p.dir, target.Name)
  689. if p.versioner != nil {
  690. err = osutil.Copy(from, to)
  691. if err == nil {
  692. err = osutil.InWritableDir(p.versioner.Archive, from)
  693. }
  694. } else {
  695. err = osutil.TryRename(from, to)
  696. }
  697. if err == nil {
  698. // The file was renamed, so we have handled both the necessary delete
  699. // of the source and the creation of the target. Fix-up the metadata,
  700. // and update the local index of the target file.
  701. p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  702. err = p.shortcutFile(target)
  703. if err != nil {
  704. l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", p.folder, target.Name, source.Name, err)
  705. return
  706. }
  707. p.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile}
  708. } else {
  709. // We failed the rename so we have a source file that we still need to
  710. // get rid of. Attempt to delete it instead so that we make *some*
  711. // progress. The target is unhandled.
  712. err = osutil.InWritableDir(osutil.Remove, from)
  713. if err != nil {
  714. l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", p.folder, target.Name, source.Name, err)
  715. return
  716. }
  717. p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  718. }
  719. }
  720. // This is the flow of data and events here, I think...
  721. //
  722. // +-----------------------+
  723. // | | - - - - > ItemStarted
  724. // | handleFile | - - - - > ItemFinished (on shortcuts)
  725. // | |
  726. // +-----------------------+
  727. // |
  728. // | copyChan (copyBlocksState; unless shortcut taken)
  729. // |
  730. // | +-----------------------+
  731. // | | +-----------------------+
  732. // +--->| | |
  733. // | | copierRoutine |
  734. // +-| |
  735. // +-----------------------+
  736. // |
  737. // | pullChan (sharedPullerState)
  738. // |
  739. // | +-----------------------+
  740. // | | +-----------------------+
  741. // +-->| | |
  742. // | | pullerRoutine |
  743. // +-| |
  744. // +-----------------------+
  745. // |
  746. // | finisherChan (sharedPullerState)
  747. // |
  748. // | +-----------------------+
  749. // | | |
  750. // +-->| finisherRoutine | - - - - > ItemFinished
  751. // | |
  752. // +-----------------------+
  753. // handleFile queues the copies and pulls as necessary for a single new or
  754. // changed file.
  755. func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
  756. curFile, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  757. if ok && len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) {
  758. // We are supposed to copy the entire file, and then fetch nothing. We
  759. // are only updating metadata, so we don't actually *need* to make the
  760. // copy.
  761. if debug {
  762. l.Debugln(p, "taking shortcut on", file.Name)
  763. }
  764. events.Default.Log(events.ItemStarted, map[string]string{
  765. "folder": p.folder,
  766. "item": file.Name,
  767. "type": "file",
  768. "action": "metadata",
  769. })
  770. p.queue.Done(file.Name)
  771. var err error
  772. if file.IsSymlink() {
  773. err = p.shortcutSymlink(file)
  774. } else {
  775. err = p.shortcutFile(file)
  776. }
  777. events.Default.Log(events.ItemFinished, map[string]interface{}{
  778. "folder": p.folder,
  779. "item": file.Name,
  780. "error": events.Error(err),
  781. "type": "file",
  782. "action": "metadata",
  783. })
  784. if err != nil {
  785. l.Infoln("Puller: shortcut:", err)
  786. } else {
  787. p.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile}
  788. }
  789. return
  790. }
  791. events.Default.Log(events.ItemStarted, map[string]string{
  792. "folder": p.folder,
  793. "item": file.Name,
  794. "type": "file",
  795. "action": "update",
  796. })
  797. scanner.PopulateOffsets(file.Blocks)
  798. // Figure out the absolute filenames we need once and for all
  799. tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
  800. realName := filepath.Join(p.dir, file.Name)
  801. reused := 0
  802. var blocks []protocol.BlockInfo
  803. // Check for an old temporary file which might have some blocks we could
  804. // reuse.
  805. tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize)
  806. if err == nil {
  807. // Check for any reusable blocks in the temp file
  808. tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
  809. // block.String() returns a string unique to the block
  810. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  811. for _, block := range tempCopyBlocks {
  812. existingBlocks[block.String()] = struct{}{}
  813. }
  814. // Since the blocks are already there, we don't need to get them.
  815. for _, block := range file.Blocks {
  816. _, ok := existingBlocks[block.String()]
  817. if !ok {
  818. blocks = append(blocks, block)
  819. }
  820. }
  821. // The sharedpullerstate will know which flags to use when opening the
  822. // temp file depending if we are reusing any blocks or not.
  823. reused = len(file.Blocks) - len(blocks)
  824. if reused == 0 {
  825. // Otherwise, discard the file ourselves in order for the
  826. // sharedpuller not to panic when it fails to exclusively create a
  827. // file which already exists
  828. os.Remove(tempName)
  829. }
  830. } else {
  831. blocks = file.Blocks
  832. }
  833. s := sharedPullerState{
  834. file: file,
  835. folder: p.folder,
  836. tempName: tempName,
  837. realName: realName,
  838. copyTotal: len(blocks),
  839. copyNeeded: len(blocks),
  840. reused: reused,
  841. ignorePerms: p.ignorePermissions(file),
  842. version: curFile.Version,
  843. mut: sync.NewMutex(),
  844. }
  845. if debug {
  846. l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
  847. }
  848. cs := copyBlocksState{
  849. sharedPullerState: &s,
  850. blocks: blocks,
  851. }
  852. copyChan <- cs
  853. }
  854. // shortcutFile sets file mode and modification time, when that's the only
  855. // thing that has changed.
  856. func (p *rwFolder) shortcutFile(file protocol.FileInfo) error {
  857. realName := filepath.Join(p.dir, file.Name)
  858. if !p.ignorePermissions(file) {
  859. if err := os.Chmod(realName, os.FileMode(file.Flags&0777)); err != nil {
  860. l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", p.folder, file.Name, err)
  861. return err
  862. }
  863. }
  864. t := time.Unix(file.Modified, 0)
  865. if err := os.Chtimes(realName, t, t); err != nil {
  866. // Try using virtual mtimes
  867. info, err := os.Stat(realName)
  868. if err != nil {
  869. l.Infof("Puller (folder %q, file %q): shortcut: unable to stat file: %v", p.folder, file.Name, err)
  870. return err
  871. }
  872. p.virtualMtimeRepo.UpdateMtime(file.Name, info.ModTime(), t)
  873. }
  874. // This may have been a conflict. We should merge the version vectors so
  875. // that our clock doesn't move backwards.
  876. if cur, ok := p.model.CurrentFolderFile(p.folder, file.Name); ok {
  877. file.Version = file.Version.Merge(cur.Version)
  878. }
  879. return nil
  880. }
  881. // shortcutSymlink changes the symlinks type if necessary.
  882. func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
  883. err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags)
  884. if err != nil {
  885. l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
  886. }
  887. return
  888. }
  889. // copierRoutine reads copierStates until the in channel closes and performs
  890. // the relevant copies when possible, or passes it to the puller routine.
  891. func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  892. buf := make([]byte, protocol.BlockSize)
  893. for state := range in {
  894. dstFd, err := state.tempFile()
  895. if err != nil {
  896. // Nothing more to do for this failed file, since we couldn't create a temporary for it.
  897. out <- state.sharedPullerState
  898. continue
  899. }
  900. if p.progressEmitter != nil {
  901. p.progressEmitter.Register(state.sharedPullerState)
  902. }
  903. folderRoots := make(map[string]string)
  904. p.model.fmut.RLock()
  905. for folder, cfg := range p.model.folderCfgs {
  906. folderRoots[folder] = cfg.Path()
  907. }
  908. p.model.fmut.RUnlock()
  909. for _, block := range state.blocks {
  910. buf = buf[:int(block.Size)]
  911. found := p.model.finder.Iterate(block.Hash, func(folder, file string, index int32) bool {
  912. fd, err := os.Open(filepath.Join(folderRoots[folder], file))
  913. if err != nil {
  914. return false
  915. }
  916. _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
  917. fd.Close()
  918. if err != nil {
  919. return false
  920. }
  921. hash, err := scanner.VerifyBuffer(buf, block)
  922. if err != nil {
  923. if hash != nil {
  924. if debug {
  925. l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
  926. }
  927. err = p.model.finder.Fix(folder, file, index, block.Hash, hash)
  928. if err != nil {
  929. l.Warnln("finder fix:", err)
  930. }
  931. } else if debug {
  932. l.Debugln("Finder failed to verify buffer", err)
  933. }
  934. return false
  935. }
  936. _, err = dstFd.WriteAt(buf, block.Offset)
  937. if err != nil {
  938. state.fail("dst write", err)
  939. }
  940. if file == state.file.Name {
  941. state.copiedFromOrigin()
  942. }
  943. return true
  944. })
  945. if state.failed() != nil {
  946. break
  947. }
  948. if !found {
  949. state.pullStarted()
  950. ps := pullBlockState{
  951. sharedPullerState: state.sharedPullerState,
  952. block: block,
  953. }
  954. pullChan <- ps
  955. } else {
  956. state.copyDone()
  957. }
  958. }
  959. out <- state.sharedPullerState
  960. }
  961. }
  962. func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  963. for state := range in {
  964. if state.failed() != nil {
  965. out <- state.sharedPullerState
  966. continue
  967. }
  968. // Get an fd to the temporary file. Technically we don't need it until
  969. // after fetching the block, but if we run into an error here there is
  970. // no point in issuing the request to the network.
  971. fd, err := state.tempFile()
  972. if err != nil {
  973. out <- state.sharedPullerState
  974. continue
  975. }
  976. var lastError error
  977. potentialDevices := p.model.Availability(p.folder, state.file.Name)
  978. for {
  979. // Select the least busy device to pull the block from. If we found no
  980. // feasible device at all, fail the block (and in the long run, the
  981. // file).
  982. selected := activity.leastBusy(potentialDevices)
  983. if selected == (protocol.DeviceID{}) {
  984. if lastError != nil {
  985. state.fail("pull", lastError)
  986. } else {
  987. state.fail("pull", errNoDevice)
  988. }
  989. break
  990. }
  991. potentialDevices = removeDevice(potentialDevices, selected)
  992. // Fetch the block, while marking the selected device as in use so that
  993. // leastBusy can select another device when someone else asks.
  994. activity.using(selected)
  995. buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, 0, nil)
  996. activity.done(selected)
  997. if lastError != nil {
  998. continue
  999. }
  1000. // Verify that the received block matches the desired hash, if not
  1001. // try pulling it from another device.
  1002. _, lastError = scanner.VerifyBuffer(buf, state.block)
  1003. if lastError != nil {
  1004. continue
  1005. }
  1006. // Save the block data we got from the cluster
  1007. _, err = fd.WriteAt(buf, state.block.Offset)
  1008. if err != nil {
  1009. state.fail("save", err)
  1010. } else {
  1011. state.pullDone()
  1012. }
  1013. break
  1014. }
  1015. out <- state.sharedPullerState
  1016. }
  1017. }
  1018. func (p *rwFolder) performFinish(state *sharedPullerState) error {
  1019. // Set the correct permission bits on the new file
  1020. if !p.ignorePermissions(state.file) {
  1021. if err := os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)); err != nil {
  1022. return err
  1023. }
  1024. }
  1025. // Set the correct timestamp on the new file
  1026. t := time.Unix(state.file.Modified, 0)
  1027. if err := os.Chtimes(state.tempName, t, t); err != nil {
  1028. // Try using virtual mtimes instead
  1029. info, err := os.Stat(state.tempName)
  1030. if err != nil {
  1031. return err
  1032. }
  1033. p.virtualMtimeRepo.UpdateMtime(state.file.Name, info.ModTime(), t)
  1034. }
  1035. var err error
  1036. if p.inConflict(state.version, state.file.Version) {
  1037. // The new file has been changed in conflict with the existing one. We
  1038. // should file it away as a conflict instead of just removing or
  1039. // archiving. Also merge with the version vector we had, to indicate
  1040. // we have resolved the conflict.
  1041. state.file.Version = state.file.Version.Merge(state.version)
  1042. err = osutil.InWritableDir(moveForConflict, state.realName)
  1043. } else if p.versioner != nil {
  1044. // If we should use versioning, let the versioner archive the old
  1045. // file before we replace it. Archiving a non-existent file is not
  1046. // an error.
  1047. err = p.versioner.Archive(state.realName)
  1048. } else {
  1049. err = nil
  1050. }
  1051. if err != nil {
  1052. return err
  1053. }
  1054. // If the target path is a symlink or a directory, we cannot copy
  1055. // over it, hence remove it before proceeding.
  1056. stat, err := osutil.Lstat(state.realName)
  1057. if err == nil && (stat.IsDir() || stat.Mode()&os.ModeSymlink != 0) {
  1058. osutil.InWritableDir(osutil.Remove, state.realName)
  1059. }
  1060. // Replace the original content with the new one
  1061. if err = osutil.Rename(state.tempName, state.realName); err != nil {
  1062. return err
  1063. }
  1064. // If it's a symlink, the target of the symlink is inside the file.
  1065. if state.file.IsSymlink() {
  1066. content, err := ioutil.ReadFile(state.realName)
  1067. if err != nil {
  1068. return err
  1069. }
  1070. // Remove the file, and replace it with a symlink.
  1071. err = osutil.InWritableDir(func(path string) error {
  1072. os.Remove(path)
  1073. return symlinks.Create(path, string(content), state.file.Flags)
  1074. }, state.realName)
  1075. if err != nil {
  1076. return err
  1077. }
  1078. }
  1079. // Record the updated file in the index
  1080. p.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile}
  1081. return nil
  1082. }
  1083. func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
  1084. for state := range in {
  1085. if closed, err := state.finalClose(); closed {
  1086. if debug {
  1087. l.Debugln(p, "closing", state.file.Name)
  1088. }
  1089. p.queue.Done(state.file.Name)
  1090. if err == nil {
  1091. err = p.performFinish(state)
  1092. }
  1093. if err != nil {
  1094. l.Infoln("Puller: final:", err)
  1095. }
  1096. events.Default.Log(events.ItemFinished, map[string]interface{}{
  1097. "folder": p.folder,
  1098. "item": state.file.Name,
  1099. "error": events.Error(err),
  1100. "type": "file",
  1101. "action": "update",
  1102. })
  1103. if p.progressEmitter != nil {
  1104. p.progressEmitter.Deregister(state)
  1105. }
  1106. }
  1107. }
  1108. }
  1109. // Moves the given filename to the front of the job queue
  1110. func (p *rwFolder) BringToFront(filename string) {
  1111. p.queue.BringToFront(filename)
  1112. }
  1113. func (p *rwFolder) Jobs() ([]string, []string) {
  1114. return p.queue.Jobs()
  1115. }
  1116. func (p *rwFolder) DelayScan(next time.Duration) {
  1117. p.delayScan <- next
  1118. }
  1119. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1120. // larger than 1000 items, and no more delayed than 2 seconds.
  1121. func (p *rwFolder) dbUpdaterRoutine() {
  1122. const (
  1123. maxBatchSize = 1000
  1124. maxBatchTime = 2 * time.Second
  1125. )
  1126. batch := make([]dbUpdateJob, 0, maxBatchSize)
  1127. files := make([]protocol.FileInfo, 0, maxBatchSize)
  1128. tick := time.NewTicker(maxBatchTime)
  1129. defer tick.Stop()
  1130. handleBatch := func() {
  1131. found := false
  1132. var lastFile protocol.FileInfo
  1133. for _, job := range batch {
  1134. files = append(files, job.file)
  1135. if job.file.IsInvalid() || (job.file.IsDirectory() && !job.file.IsSymlink()) {
  1136. continue
  1137. }
  1138. if job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) == 0 {
  1139. continue
  1140. }
  1141. found = true
  1142. lastFile = job.file
  1143. }
  1144. p.model.updateLocals(p.folder, files)
  1145. if found {
  1146. p.model.receivedFile(p.folder, lastFile)
  1147. }
  1148. batch = batch[:0]
  1149. files = files[:0]
  1150. }
  1151. loop:
  1152. for {
  1153. select {
  1154. case job, ok := <-p.dbUpdates:
  1155. if !ok {
  1156. break loop
  1157. }
  1158. job.file.LocalVersion = 0
  1159. batch = append(batch, job)
  1160. if len(batch) == maxBatchSize {
  1161. handleBatch()
  1162. }
  1163. case <-tick.C:
  1164. if len(batch) > 0 {
  1165. handleBatch()
  1166. }
  1167. }
  1168. }
  1169. if len(batch) > 0 {
  1170. handleBatch()
  1171. }
  1172. }
  1173. func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool {
  1174. if current.Concurrent(replacement) {
  1175. // Obvious case
  1176. return true
  1177. }
  1178. if replacement.Counter(p.shortID) > current.Counter(p.shortID) {
  1179. // The replacement file contains a higher version for ourselves than
  1180. // what we have. This isn't supposed to be possible, since it's only
  1181. // we who can increment that counter. We take it as a sign that
  1182. // something is wrong (our index may have been corrupted or removed)
  1183. // and flag it as a conflict.
  1184. return true
  1185. }
  1186. return false
  1187. }
  1188. func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
  1189. for i := range cfg.Folders {
  1190. folder := &cfg.Folders[i]
  1191. if folder.ID == folderID {
  1192. folder.Invalid = err.Error()
  1193. return
  1194. }
  1195. }
  1196. }
  1197. func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID {
  1198. for i := range devices {
  1199. if devices[i] == device {
  1200. devices[i] = devices[len(devices)-1]
  1201. return devices[:len(devices)-1]
  1202. }
  1203. }
  1204. return devices
  1205. }
  1206. func moveForConflict(name string) error {
  1207. ext := filepath.Ext(name)
  1208. withoutExt := name[:len(name)-len(ext)]
  1209. newName := withoutExt + time.Now().Format(".sync-conflict-20060102-150405") + ext
  1210. err := os.Rename(name, newName)
  1211. if os.IsNotExist(err) {
  1212. // We were supposed to move a file away but it does not exist. Either
  1213. // the user has already moved it away, or the conflict was between a
  1214. // remote modification and a local delete. In either way it does not
  1215. // matter, go ahead as if the move succeeded.
  1216. return nil
  1217. }
  1218. return err
  1219. }