rwfolder.go 43 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "os"
  13. "path/filepath"
  14. "sort"
  15. "time"
  16. "github.com/syncthing/protocol"
  17. "github.com/syncthing/syncthing/lib/config"
  18. "github.com/syncthing/syncthing/lib/db"
  19. "github.com/syncthing/syncthing/lib/events"
  20. "github.com/syncthing/syncthing/lib/ignore"
  21. "github.com/syncthing/syncthing/lib/osutil"
  22. "github.com/syncthing/syncthing/lib/scanner"
  23. "github.com/syncthing/syncthing/lib/symlinks"
  24. "github.com/syncthing/syncthing/lib/sync"
  25. "github.com/syncthing/syncthing/lib/versioner"
  26. )
  27. // TODO: Stop on errors
  28. const (
  29. pauseIntv = 60 * time.Second
  30. nextPullIntv = 10 * time.Second
  31. shortPullIntv = time.Second
  32. )
  33. // A pullBlockState is passed to the puller routine for each block that needs
  34. // to be fetched.
  35. type pullBlockState struct {
  36. *sharedPullerState
  37. block protocol.BlockInfo
  38. }
  39. // A copyBlocksState is passed to copy routine if the file has blocks to be
  40. // copied.
  41. type copyBlocksState struct {
  42. *sharedPullerState
  43. blocks []protocol.BlockInfo
  44. }
  45. // Which filemode bits to preserve
  46. const retainBits = os.ModeSetgid | os.ModeSetuid | os.ModeSticky
  47. var (
  48. activity = newDeviceActivity()
  49. errNoDevice = errors.New("no available source device")
  50. )
  51. const (
  52. dbUpdateHandleDir = iota
  53. dbUpdateDeleteDir
  54. dbUpdateHandleFile
  55. dbUpdateDeleteFile
  56. dbUpdateShortcutFile
  57. )
  58. const (
  59. defaultCopiers = 1
  60. defaultPullers = 16
  61. )
  62. type dbUpdateJob struct {
  63. file protocol.FileInfo
  64. jobType int
  65. }
  66. type rwFolder struct {
  67. stateTracker
  68. model *Model
  69. progressEmitter *ProgressEmitter
  70. virtualMtimeRepo *db.VirtualMtimeRepo
  71. folder string
  72. dir string
  73. scanIntv time.Duration
  74. versioner versioner.Versioner
  75. ignorePerms bool
  76. copiers int
  77. pullers int
  78. shortID uint64
  79. order config.PullOrder
  80. stop chan struct{}
  81. queue *jobQueue
  82. dbUpdates chan dbUpdateJob
  83. scanTimer *time.Timer
  84. pullTimer *time.Timer
  85. delayScan chan time.Duration
  86. scanNow chan rescanRequest
  87. remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
  88. errors map[string]string // path -> error string
  89. errorsMut sync.Mutex
  90. }
  91. func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
  92. p := &rwFolder{
  93. stateTracker: stateTracker{
  94. folder: cfg.ID,
  95. mut: sync.NewMutex(),
  96. },
  97. model: m,
  98. progressEmitter: m.progressEmitter,
  99. virtualMtimeRepo: db.NewVirtualMtimeRepo(m.db, cfg.ID),
  100. folder: cfg.ID,
  101. dir: cfg.Path(),
  102. scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
  103. ignorePerms: cfg.IgnorePerms,
  104. copiers: cfg.Copiers,
  105. pullers: cfg.Pullers,
  106. shortID: shortID,
  107. order: cfg.Order,
  108. stop: make(chan struct{}),
  109. queue: newJobQueue(),
  110. pullTimer: time.NewTimer(shortPullIntv),
  111. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
  112. delayScan: make(chan time.Duration),
  113. scanNow: make(chan rescanRequest),
  114. remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
  115. errorsMut: sync.NewMutex(),
  116. }
  117. if p.copiers == 0 {
  118. p.copiers = defaultCopiers
  119. }
  120. if p.pullers == 0 {
  121. p.pullers = defaultPullers
  122. }
  123. return p
  124. }
  125. // Helper function to check whether either the ignorePerm flag has been
  126. // set on the local host or the FlagNoPermBits has been set on the file/dir
  127. // which is being pulled.
  128. func (p *rwFolder) ignorePermissions(file protocol.FileInfo) bool {
  129. return p.ignorePerms || file.Flags&protocol.FlagNoPermBits != 0
  130. }
  131. // Serve will run scans and pulls. It will return when Stop()ed or on a
  132. // critical error.
  133. func (p *rwFolder) Serve() {
  134. if debug {
  135. l.Debugln(p, "starting")
  136. defer l.Debugln(p, "exiting")
  137. }
  138. defer func() {
  139. p.pullTimer.Stop()
  140. p.scanTimer.Stop()
  141. // TODO: Should there be an actual FolderStopped state?
  142. p.setState(FolderIdle)
  143. }()
  144. var prevVer int64
  145. var prevIgnoreHash string
  146. rescheduleScan := func() {
  147. if p.scanIntv == 0 {
  148. // We should not run scans, so it should not be rescheduled.
  149. return
  150. }
  151. // Sleep a random time between 3/4 and 5/4 of the configured interval.
  152. sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
  153. intv := time.Duration(sleepNanos) * time.Nanosecond
  154. if debug {
  155. l.Debugln(p, "next rescan in", intv)
  156. }
  157. p.scanTimer.Reset(intv)
  158. }
  159. // We don't start pulling files until a scan has been completed.
  160. initialScanCompleted := false
  161. for {
  162. select {
  163. case <-p.stop:
  164. return
  165. case <-p.remoteIndex:
  166. prevVer = 0
  167. p.pullTimer.Reset(shortPullIntv)
  168. if debug {
  169. l.Debugln(p, "remote index updated, rescheduling pull")
  170. }
  171. case <-p.pullTimer.C:
  172. if !initialScanCompleted {
  173. if debug {
  174. l.Debugln(p, "skip (initial)")
  175. }
  176. p.pullTimer.Reset(nextPullIntv)
  177. continue
  178. }
  179. p.model.fmut.RLock()
  180. curIgnores := p.model.folderIgnores[p.folder]
  181. p.model.fmut.RUnlock()
  182. if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
  183. // The ignore patterns have changed. We need to re-evaluate if
  184. // there are files we need now that were ignored before.
  185. if debug {
  186. l.Debugln(p, "ignore patterns have changed, resetting prevVer")
  187. }
  188. prevVer = 0
  189. prevIgnoreHash = newHash
  190. }
  191. // RemoteLocalVersion() is a fast call, doesn't touch the database.
  192. curVer, ok := p.model.RemoteLocalVersion(p.folder)
  193. if !ok || curVer == prevVer {
  194. if debug {
  195. l.Debugln(p, "skip (curVer == prevVer)", prevVer, ok)
  196. }
  197. p.pullTimer.Reset(nextPullIntv)
  198. continue
  199. }
  200. if debug {
  201. l.Debugln(p, "pulling", prevVer, curVer)
  202. }
  203. p.setState(FolderSyncing)
  204. p.clearErrors()
  205. tries := 0
  206. for {
  207. tries++
  208. changed := p.pullerIteration(curIgnores)
  209. if debug {
  210. l.Debugln(p, "changed", changed)
  211. }
  212. if changed == 0 {
  213. // No files were changed by the puller, so we are in
  214. // sync. Remember the local version number and
  215. // schedule a resync a little bit into the future.
  216. if lv, ok := p.model.RemoteLocalVersion(p.folder); ok && lv < curVer {
  217. // There's a corner case where the device we needed
  218. // files from disconnected during the puller
  219. // iteration. The files will have been removed from
  220. // the index, so we've concluded that we don't need
  221. // them, but at the same time we have the local
  222. // version that includes those files in curVer. So we
  223. // catch the case that localVersion might have
  224. // decreased here.
  225. l.Debugln(p, "adjusting curVer", lv)
  226. curVer = lv
  227. }
  228. prevVer = curVer
  229. if debug {
  230. l.Debugln(p, "next pull in", nextPullIntv)
  231. }
  232. p.pullTimer.Reset(nextPullIntv)
  233. break
  234. }
  235. if tries > 10 {
  236. // We've tried a bunch of times to get in sync, but
  237. // we're not making it. Probably there are write
  238. // errors preventing us. Flag this with a warning and
  239. // wait a bit longer before retrying.
  240. l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, pauseIntv)
  241. if debug {
  242. l.Debugln(p, "next pull in", pauseIntv)
  243. }
  244. if folderErrors := p.currentErrors(); len(folderErrors) > 0 {
  245. events.Default.Log(events.FolderErrors, map[string]interface{}{
  246. "folder": p.folder,
  247. "errors": folderErrors,
  248. })
  249. }
  250. p.pullTimer.Reset(pauseIntv)
  251. break
  252. }
  253. }
  254. p.setState(FolderIdle)
  255. // The reason for running the scanner from within the puller is that
  256. // this is the easiest way to make sure we are not doing both at the
  257. // same time.
  258. case <-p.scanTimer.C:
  259. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  260. l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
  261. rescheduleScan()
  262. continue
  263. }
  264. if debug {
  265. l.Debugln(p, "rescan")
  266. }
  267. if err := p.model.internalScanFolderSubs(p.folder, nil); err != nil {
  268. // Potentially sets the error twice, once in the scanner just
  269. // by doing a check, and once here, if the error returned is
  270. // the same one as returned by CheckFolderHealth, though
  271. // duplicate set is handled by setError.
  272. p.setError(err)
  273. rescheduleScan()
  274. continue
  275. }
  276. if p.scanIntv > 0 {
  277. rescheduleScan()
  278. }
  279. if !initialScanCompleted {
  280. l.Infoln("Completed initial scan (rw) of folder", p.folder)
  281. initialScanCompleted = true
  282. }
  283. case req := <-p.scanNow:
  284. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  285. l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
  286. req.err <- err
  287. continue
  288. }
  289. if debug {
  290. l.Debugln(p, "forced rescan")
  291. }
  292. if err := p.model.internalScanFolderSubs(p.folder, req.subs); err != nil {
  293. // Potentially sets the error twice, once in the scanner just
  294. // by doing a check, and once here, if the error returned is
  295. // the same one as returned by CheckFolderHealth, though
  296. // duplicate set is handled by setError.
  297. p.setError(err)
  298. req.err <- err
  299. continue
  300. }
  301. req.err <- nil
  302. case next := <-p.delayScan:
  303. p.scanTimer.Reset(next)
  304. }
  305. }
  306. }
  307. func (p *rwFolder) Stop() {
  308. close(p.stop)
  309. }
  310. func (p *rwFolder) IndexUpdated() {
  311. select {
  312. case p.remoteIndex <- struct{}{}:
  313. default:
  314. // We might be busy doing a pull and thus not reading from this
  315. // channel. The channel is 1-buffered, so one notification will be
  316. // queued to ensure we recheck after the pull, but beyond that we must
  317. // make sure to not block index receiving.
  318. }
  319. }
  320. func (p *rwFolder) Scan(subs []string) error {
  321. req := rescanRequest{
  322. subs: subs,
  323. err: make(chan error),
  324. }
  325. p.scanNow <- req
  326. return <-req.err
  327. }
  328. func (p *rwFolder) String() string {
  329. return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
  330. }
  331. // pullerIteration runs a single puller iteration for the given folder and
  332. // returns the number items that should have been synced (even those that
  333. // might have failed). One puller iteration handles all files currently
  334. // flagged as needed in the folder.
  335. func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
  336. pullChan := make(chan pullBlockState)
  337. copyChan := make(chan copyBlocksState)
  338. finisherChan := make(chan *sharedPullerState)
  339. updateWg := sync.NewWaitGroup()
  340. copyWg := sync.NewWaitGroup()
  341. pullWg := sync.NewWaitGroup()
  342. doneWg := sync.NewWaitGroup()
  343. if debug {
  344. l.Debugln(p, "c", p.copiers, "p", p.pullers)
  345. }
  346. p.dbUpdates = make(chan dbUpdateJob)
  347. updateWg.Add(1)
  348. go func() {
  349. // dbUpdaterRoutine finishes when p.dbUpdates is closed
  350. p.dbUpdaterRoutine()
  351. updateWg.Done()
  352. }()
  353. for i := 0; i < p.copiers; i++ {
  354. copyWg.Add(1)
  355. go func() {
  356. // copierRoutine finishes when copyChan is closed
  357. p.copierRoutine(copyChan, pullChan, finisherChan)
  358. copyWg.Done()
  359. }()
  360. }
  361. for i := 0; i < p.pullers; i++ {
  362. pullWg.Add(1)
  363. go func() {
  364. // pullerRoutine finishes when pullChan is closed
  365. p.pullerRoutine(pullChan, finisherChan)
  366. pullWg.Done()
  367. }()
  368. }
  369. doneWg.Add(1)
  370. // finisherRoutine finishes when finisherChan is closed
  371. go func() {
  372. p.finisherRoutine(finisherChan)
  373. doneWg.Done()
  374. }()
  375. p.model.fmut.RLock()
  376. folderFiles := p.model.folderFiles[p.folder]
  377. p.model.fmut.RUnlock()
  378. // !!!
  379. // WithNeed takes a database snapshot (by necessity). By the time we've
  380. // handled a bunch of files it might have become out of date and we might
  381. // be attempting to sync with an old version of a file...
  382. // !!!
  383. changed := 0
  384. pullFileSize := int64(0)
  385. fileDeletions := map[string]protocol.FileInfo{}
  386. dirDeletions := []protocol.FileInfo{}
  387. buckets := map[string][]protocol.FileInfo{}
  388. folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
  389. // Needed items are delivered sorted lexicographically. We'll handle
  390. // directories as they come along, so parents before children. Files
  391. // are queued and the order may be changed later.
  392. file := intf.(protocol.FileInfo)
  393. if ignores.Match(file.Name) {
  394. // This is an ignored file. Skip it, continue iteration.
  395. return true
  396. }
  397. if debug {
  398. l.Debugln(p, "handling", file.Name)
  399. }
  400. switch {
  401. case file.IsDeleted():
  402. // A deleted file, directory or symlink
  403. if file.IsDirectory() {
  404. dirDeletions = append(dirDeletions, file)
  405. } else {
  406. fileDeletions[file.Name] = file
  407. df, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  408. // Local file can be already deleted, but with a lower version
  409. // number, hence the deletion coming in again as part of
  410. // WithNeed, furthermore, the file can simply be of the wrong
  411. // type if we haven't yet managed to pull it.
  412. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
  413. // Put files into buckets per first hash
  414. key := string(df.Blocks[0].Hash)
  415. buckets[key] = append(buckets[key], df)
  416. }
  417. }
  418. case file.IsDirectory() && !file.IsSymlink():
  419. // A new or changed directory
  420. if debug {
  421. l.Debugln("Creating directory", file.Name)
  422. }
  423. p.handleDir(file)
  424. default:
  425. // A new or changed file or symlink. This is the only case where we
  426. // do stuff concurrently in the background
  427. pullFileSize += file.Size()
  428. p.queue.Push(file.Name, file.Size(), file.Modified)
  429. }
  430. changed++
  431. return true
  432. })
  433. // Check if we are able to store all files on disk. Only perform this
  434. // check if there is a minimum free space threshold set on the folder.
  435. folderCfg := p.model.cfg.Folders()[p.folder]
  436. if folderCfg.MinDiskFreePct > 0 && pullFileSize > 0 {
  437. if free, err := osutil.DiskFreeBytes(folderCfg.Path()); err == nil && free < pullFileSize {
  438. l.Warnf(`Folder "%s": insufficient disk space available to pull %d files (%.2f MiB)`, p.folder, changed, float64(pullFileSize)/1024/1024)
  439. return 0
  440. }
  441. }
  442. // Reorder the file queue according to configuration
  443. switch p.order {
  444. case config.OrderRandom:
  445. p.queue.Shuffle()
  446. case config.OrderAlphabetic:
  447. // The queue is already in alphabetic order.
  448. case config.OrderSmallestFirst:
  449. p.queue.SortSmallestFirst()
  450. case config.OrderLargestFirst:
  451. p.queue.SortLargestFirst()
  452. case config.OrderOldestFirst:
  453. p.queue.SortOldestFirst()
  454. case config.OrderNewestFirst:
  455. p.queue.SortNewestFirst()
  456. }
  457. // Process the file queue
  458. nextFile:
  459. for {
  460. fileName, ok := p.queue.Pop()
  461. if !ok {
  462. break
  463. }
  464. f, ok := p.model.CurrentGlobalFile(p.folder, fileName)
  465. if !ok {
  466. // File is no longer in the index. Mark it as done and drop it.
  467. p.queue.Done(fileName)
  468. continue
  469. }
  470. // Local file can be already deleted, but with a lower version
  471. // number, hence the deletion coming in again as part of
  472. // WithNeed, furthermore, the file can simply be of the wrong type if
  473. // the global index changed while we were processing this iteration.
  474. if !f.IsDeleted() && !f.IsSymlink() && !f.IsDirectory() {
  475. key := string(f.Blocks[0].Hash)
  476. for i, candidate := range buckets[key] {
  477. if scanner.BlocksEqual(candidate.Blocks, f.Blocks) {
  478. // Remove the candidate from the bucket
  479. lidx := len(buckets[key]) - 1
  480. buckets[key][i] = buckets[key][lidx]
  481. buckets[key] = buckets[key][:lidx]
  482. // candidate is our current state of the file, where as the
  483. // desired state with the delete bit set is in the deletion
  484. // map.
  485. desired := fileDeletions[candidate.Name]
  486. // Remove the pending deletion (as we perform it by renaming)
  487. delete(fileDeletions, candidate.Name)
  488. p.renameFile(desired, f)
  489. p.queue.Done(fileName)
  490. continue nextFile
  491. }
  492. }
  493. }
  494. // Not a rename or a symlink, deal with it.
  495. p.handleFile(f, copyChan, finisherChan)
  496. }
  497. // Signal copy and puller routines that we are done with the in data for
  498. // this iteration. Wait for them to finish.
  499. close(copyChan)
  500. copyWg.Wait()
  501. close(pullChan)
  502. pullWg.Wait()
  503. // Signal the finisher chan that there will be no more input.
  504. close(finisherChan)
  505. // Wait for the finisherChan to finish.
  506. doneWg.Wait()
  507. for _, file := range fileDeletions {
  508. if debug {
  509. l.Debugln("Deleting file", file.Name)
  510. }
  511. p.deleteFile(file)
  512. }
  513. for i := range dirDeletions {
  514. dir := dirDeletions[len(dirDeletions)-i-1]
  515. if debug {
  516. l.Debugln("Deleting dir", dir.Name)
  517. }
  518. p.deleteDir(dir)
  519. }
  520. // Wait for db updates to complete
  521. close(p.dbUpdates)
  522. updateWg.Wait()
  523. return changed
  524. }
  525. // handleDir creates or updates the given directory
  526. func (p *rwFolder) handleDir(file protocol.FileInfo) {
  527. var err error
  528. events.Default.Log(events.ItemStarted, map[string]string{
  529. "folder": p.folder,
  530. "item": file.Name,
  531. "type": "dir",
  532. "action": "update",
  533. })
  534. defer func() {
  535. events.Default.Log(events.ItemFinished, map[string]interface{}{
  536. "folder": p.folder,
  537. "item": file.Name,
  538. "error": events.Error(err),
  539. "type": "dir",
  540. "action": "update",
  541. })
  542. }()
  543. realName := filepath.Join(p.dir, file.Name)
  544. mode := os.FileMode(file.Flags & 0777)
  545. if p.ignorePermissions(file) {
  546. mode = 0777
  547. }
  548. if debug {
  549. curFile, _ := p.model.CurrentFolderFile(p.folder, file.Name)
  550. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  551. }
  552. info, err := osutil.Lstat(realName)
  553. switch {
  554. // There is already something under that name, but it's a file/link.
  555. // Most likely a file/link is getting replaced with a directory.
  556. // Remove the file/link and fall through to directory creation.
  557. case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
  558. err = osutil.InWritableDir(osutil.Remove, realName)
  559. if err != nil {
  560. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  561. p.newError(file.Name, err)
  562. return
  563. }
  564. fallthrough
  565. // The directory doesn't exist, so we create it with the right
  566. // mode bits from the start.
  567. case err != nil && os.IsNotExist(err):
  568. // We declare a function that acts on only the path name, so
  569. // we can pass it to InWritableDir. We use a regular Mkdir and
  570. // not MkdirAll because the parent should already exist.
  571. mkdir := func(path string) error {
  572. err = os.Mkdir(path, mode)
  573. if err != nil || p.ignorePermissions(file) {
  574. return err
  575. }
  576. // Stat the directory so we can check its permissions.
  577. info, err := osutil.Lstat(path)
  578. if err != nil {
  579. return err
  580. }
  581. // Mask for the bits we want to preserve and add them in to the
  582. // directories permissions.
  583. return os.Chmod(path, mode|(info.Mode()&retainBits))
  584. }
  585. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  586. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  587. } else {
  588. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  589. p.newError(file.Name, err)
  590. }
  591. return
  592. // Weird error when stat()'ing the dir. Probably won't work to do
  593. // anything else with it if we can't even stat() it.
  594. case err != nil:
  595. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  596. p.newError(file.Name, err)
  597. return
  598. }
  599. // The directory already exists, so we just correct the mode bits. (We
  600. // don't handle modification times on directories, because that sucks...)
  601. // It's OK to change mode bits on stuff within non-writable directories.
  602. if p.ignorePermissions(file) {
  603. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  604. } else if err := os.Chmod(realName, mode|(info.Mode()&retainBits)); err == nil {
  605. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  606. } else {
  607. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  608. p.newError(file.Name, err)
  609. }
  610. }
  611. // deleteDir attempts to delete the given directory
  612. func (p *rwFolder) deleteDir(file protocol.FileInfo) {
  613. var err error
  614. events.Default.Log(events.ItemStarted, map[string]string{
  615. "folder": p.folder,
  616. "item": file.Name,
  617. "type": "dir",
  618. "action": "delete",
  619. })
  620. defer func() {
  621. events.Default.Log(events.ItemFinished, map[string]interface{}{
  622. "folder": p.folder,
  623. "item": file.Name,
  624. "error": events.Error(err),
  625. "type": "dir",
  626. "action": "delete",
  627. })
  628. }()
  629. realName := filepath.Join(p.dir, file.Name)
  630. // Delete any temporary files lying around in the directory
  631. dir, _ := os.Open(realName)
  632. if dir != nil {
  633. files, _ := dir.Readdirnames(-1)
  634. for _, file := range files {
  635. if defTempNamer.IsTemporary(file) {
  636. osutil.InWritableDir(osutil.Remove, filepath.Join(realName, file))
  637. }
  638. }
  639. }
  640. err = osutil.InWritableDir(osutil.Remove, realName)
  641. if err == nil || os.IsNotExist(err) {
  642. // It was removed or it doesn't exist to start with
  643. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  644. } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  645. // We get an error just looking at the directory, and it's not a
  646. // permission problem. Lets assume the error is in fact some variant
  647. // of "file does not exist" (possibly expressed as some parent being a
  648. // file and not a directory etc) and that the delete is handled.
  649. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  650. } else {
  651. l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
  652. p.newError(file.Name, err)
  653. }
  654. }
  655. // deleteFile attempts to delete the given file
  656. func (p *rwFolder) deleteFile(file protocol.FileInfo) {
  657. var err error
  658. events.Default.Log(events.ItemStarted, map[string]string{
  659. "folder": p.folder,
  660. "item": file.Name,
  661. "type": "file",
  662. "action": "delete",
  663. })
  664. defer func() {
  665. events.Default.Log(events.ItemFinished, map[string]interface{}{
  666. "folder": p.folder,
  667. "item": file.Name,
  668. "error": events.Error(err),
  669. "type": "file",
  670. "action": "delete",
  671. })
  672. }()
  673. realName := filepath.Join(p.dir, file.Name)
  674. cur, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  675. if ok && p.inConflict(cur.Version, file.Version) {
  676. // There is a conflict here. Move the file to a conflict copy instead
  677. // of deleting. Also merge with the version vector we had, to indicate
  678. // we have resolved the conflict.
  679. file.Version = file.Version.Merge(cur.Version)
  680. err = osutil.InWritableDir(moveForConflict, realName)
  681. } else if p.versioner != nil {
  682. err = osutil.InWritableDir(p.versioner.Archive, realName)
  683. } else {
  684. err = osutil.InWritableDir(osutil.Remove, realName)
  685. }
  686. if err == nil || os.IsNotExist(err) {
  687. // It was removed or it doesn't exist to start with
  688. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  689. } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  690. // We get an error just looking at the file, and it's not a permission
  691. // problem. Lets assume the error is in fact some variant of "file
  692. // does not exist" (possibly expressed as some parent being a file and
  693. // not a directory etc) and that the delete is handled.
  694. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  695. } else {
  696. l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
  697. p.newError(file.Name, err)
  698. }
  699. }
  700. // renameFile attempts to rename an existing file to a destination
  701. // and set the right attributes on it.
  702. func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
  703. var err error
  704. events.Default.Log(events.ItemStarted, map[string]string{
  705. "folder": p.folder,
  706. "item": source.Name,
  707. "type": "file",
  708. "action": "delete",
  709. })
  710. events.Default.Log(events.ItemStarted, map[string]string{
  711. "folder": p.folder,
  712. "item": target.Name,
  713. "type": "file",
  714. "action": "update",
  715. })
  716. defer func() {
  717. events.Default.Log(events.ItemFinished, map[string]interface{}{
  718. "folder": p.folder,
  719. "item": source.Name,
  720. "error": events.Error(err),
  721. "type": "file",
  722. "action": "delete",
  723. })
  724. events.Default.Log(events.ItemFinished, map[string]interface{}{
  725. "folder": p.folder,
  726. "item": target.Name,
  727. "error": events.Error(err),
  728. "type": "file",
  729. "action": "update",
  730. })
  731. }()
  732. if debug {
  733. l.Debugln(p, "taking rename shortcut", source.Name, "->", target.Name)
  734. }
  735. from := filepath.Join(p.dir, source.Name)
  736. to := filepath.Join(p.dir, target.Name)
  737. if p.versioner != nil {
  738. err = osutil.Copy(from, to)
  739. if err == nil {
  740. err = osutil.InWritableDir(p.versioner.Archive, from)
  741. }
  742. } else {
  743. err = osutil.TryRename(from, to)
  744. }
  745. if err == nil {
  746. // The file was renamed, so we have handled both the necessary delete
  747. // of the source and the creation of the target. Fix-up the metadata,
  748. // and update the local index of the target file.
  749. p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  750. err = p.shortcutFile(target)
  751. if err != nil {
  752. l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", p.folder, target.Name, source.Name, err)
  753. p.newError(target.Name, err)
  754. return
  755. }
  756. p.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile}
  757. } else {
  758. // We failed the rename so we have a source file that we still need to
  759. // get rid of. Attempt to delete it instead so that we make *some*
  760. // progress. The target is unhandled.
  761. err = osutil.InWritableDir(osutil.Remove, from)
  762. if err != nil {
  763. l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", p.folder, target.Name, source.Name, err)
  764. p.newError(target.Name, err)
  765. return
  766. }
  767. p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  768. }
  769. }
  770. // This is the flow of data and events here, I think...
  771. //
  772. // +-----------------------+
  773. // | | - - - - > ItemStarted
  774. // | handleFile | - - - - > ItemFinished (on shortcuts)
  775. // | |
  776. // +-----------------------+
  777. // |
  778. // | copyChan (copyBlocksState; unless shortcut taken)
  779. // |
  780. // | +-----------------------+
  781. // | | +-----------------------+
  782. // +--->| | |
  783. // | | copierRoutine |
  784. // +-| |
  785. // +-----------------------+
  786. // |
  787. // | pullChan (sharedPullerState)
  788. // |
  789. // | +-----------------------+
  790. // | | +-----------------------+
  791. // +-->| | |
  792. // | | pullerRoutine |
  793. // +-| |
  794. // +-----------------------+
  795. // |
  796. // | finisherChan (sharedPullerState)
  797. // |
  798. // | +-----------------------+
  799. // | | |
  800. // +-->| finisherRoutine | - - - - > ItemFinished
  801. // | |
  802. // +-----------------------+
  803. // handleFile queues the copies and pulls as necessary for a single new or
  804. // changed file.
  805. func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
  806. curFile, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  807. if ok && len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) {
  808. // We are supposed to copy the entire file, and then fetch nothing. We
  809. // are only updating metadata, so we don't actually *need* to make the
  810. // copy.
  811. if debug {
  812. l.Debugln(p, "taking shortcut on", file.Name)
  813. }
  814. events.Default.Log(events.ItemStarted, map[string]string{
  815. "folder": p.folder,
  816. "item": file.Name,
  817. "type": "file",
  818. "action": "metadata",
  819. })
  820. p.queue.Done(file.Name)
  821. var err error
  822. if file.IsSymlink() {
  823. err = p.shortcutSymlink(file)
  824. } else {
  825. err = p.shortcutFile(file)
  826. }
  827. events.Default.Log(events.ItemFinished, map[string]interface{}{
  828. "folder": p.folder,
  829. "item": file.Name,
  830. "error": events.Error(err),
  831. "type": "file",
  832. "action": "metadata",
  833. })
  834. if err != nil {
  835. l.Infoln("Puller: shortcut:", err)
  836. p.newError(file.Name, err)
  837. } else {
  838. p.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile}
  839. }
  840. return
  841. }
  842. events.Default.Log(events.ItemStarted, map[string]string{
  843. "folder": p.folder,
  844. "item": file.Name,
  845. "type": "file",
  846. "action": "update",
  847. })
  848. scanner.PopulateOffsets(file.Blocks)
  849. // Figure out the absolute filenames we need once and for all
  850. tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
  851. realName := filepath.Join(p.dir, file.Name)
  852. reused := 0
  853. var blocks []protocol.BlockInfo
  854. // Check for an old temporary file which might have some blocks we could
  855. // reuse.
  856. tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize, 0, nil)
  857. if err == nil {
  858. // Check for any reusable blocks in the temp file
  859. tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
  860. // block.String() returns a string unique to the block
  861. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  862. for _, block := range tempCopyBlocks {
  863. existingBlocks[block.String()] = struct{}{}
  864. }
  865. // Since the blocks are already there, we don't need to get them.
  866. for _, block := range file.Blocks {
  867. _, ok := existingBlocks[block.String()]
  868. if !ok {
  869. blocks = append(blocks, block)
  870. }
  871. }
  872. // The sharedpullerstate will know which flags to use when opening the
  873. // temp file depending if we are reusing any blocks or not.
  874. reused = len(file.Blocks) - len(blocks)
  875. if reused == 0 {
  876. // Otherwise, discard the file ourselves in order for the
  877. // sharedpuller not to panic when it fails to exclusively create a
  878. // file which already exists
  879. osutil.InWritableDir(osutil.Remove, tempName)
  880. }
  881. } else {
  882. blocks = file.Blocks
  883. }
  884. s := sharedPullerState{
  885. file: file,
  886. folder: p.folder,
  887. tempName: tempName,
  888. realName: realName,
  889. copyTotal: len(blocks),
  890. copyNeeded: len(blocks),
  891. reused: reused,
  892. ignorePerms: p.ignorePermissions(file),
  893. version: curFile.Version,
  894. mut: sync.NewMutex(),
  895. }
  896. if debug {
  897. l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
  898. }
  899. cs := copyBlocksState{
  900. sharedPullerState: &s,
  901. blocks: blocks,
  902. }
  903. copyChan <- cs
  904. }
  905. // shortcutFile sets file mode and modification time, when that's the only
  906. // thing that has changed.
  907. func (p *rwFolder) shortcutFile(file protocol.FileInfo) error {
  908. realName := filepath.Join(p.dir, file.Name)
  909. if !p.ignorePermissions(file) {
  910. if err := os.Chmod(realName, os.FileMode(file.Flags&0777)); err != nil {
  911. l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", p.folder, file.Name, err)
  912. p.newError(file.Name, err)
  913. return err
  914. }
  915. }
  916. t := time.Unix(file.Modified, 0)
  917. if err := os.Chtimes(realName, t, t); err != nil {
  918. // Try using virtual mtimes
  919. info, err := os.Stat(realName)
  920. if err != nil {
  921. l.Infof("Puller (folder %q, file %q): shortcut: unable to stat file: %v", p.folder, file.Name, err)
  922. p.newError(file.Name, err)
  923. return err
  924. }
  925. p.virtualMtimeRepo.UpdateMtime(file.Name, info.ModTime(), t)
  926. }
  927. // This may have been a conflict. We should merge the version vectors so
  928. // that our clock doesn't move backwards.
  929. if cur, ok := p.model.CurrentFolderFile(p.folder, file.Name); ok {
  930. file.Version = file.Version.Merge(cur.Version)
  931. }
  932. return nil
  933. }
  934. // shortcutSymlink changes the symlinks type if necessary.
  935. func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
  936. tt := symlinks.TargetFile
  937. if file.IsDirectory() {
  938. tt = symlinks.TargetDirectory
  939. }
  940. err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), tt)
  941. if err != nil {
  942. l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
  943. p.newError(file.Name, err)
  944. }
  945. return
  946. }
  947. // copierRoutine reads copierStates until the in channel closes and performs
  948. // the relevant copies when possible, or passes it to the puller routine.
  949. func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  950. buf := make([]byte, protocol.BlockSize)
  951. for state := range in {
  952. dstFd, err := state.tempFile()
  953. if err != nil {
  954. // Nothing more to do for this failed file, since we couldn't create a temporary for it.
  955. out <- state.sharedPullerState
  956. continue
  957. }
  958. if p.progressEmitter != nil {
  959. p.progressEmitter.Register(state.sharedPullerState)
  960. }
  961. folderRoots := make(map[string]string)
  962. var folders []string
  963. p.model.fmut.RLock()
  964. for folder, cfg := range p.model.folderCfgs {
  965. folderRoots[folder] = cfg.Path()
  966. folders = append(folders, folder)
  967. }
  968. p.model.fmut.RUnlock()
  969. for _, block := range state.blocks {
  970. buf = buf[:int(block.Size)]
  971. found := p.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool {
  972. fd, err := os.Open(filepath.Join(folderRoots[folder], file))
  973. if err != nil {
  974. return false
  975. }
  976. _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
  977. fd.Close()
  978. if err != nil {
  979. return false
  980. }
  981. hash, err := scanner.VerifyBuffer(buf, block)
  982. if err != nil {
  983. if hash != nil {
  984. if debug {
  985. l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
  986. }
  987. err = p.model.finder.Fix(folder, file, index, block.Hash, hash)
  988. if err != nil {
  989. l.Warnln("finder fix:", err)
  990. }
  991. } else if debug {
  992. l.Debugln("Finder failed to verify buffer", err)
  993. }
  994. return false
  995. }
  996. _, err = dstFd.WriteAt(buf, block.Offset)
  997. if err != nil {
  998. state.fail("dst write", err)
  999. }
  1000. if file == state.file.Name {
  1001. state.copiedFromOrigin()
  1002. }
  1003. return true
  1004. })
  1005. if state.failed() != nil {
  1006. break
  1007. }
  1008. if !found {
  1009. state.pullStarted()
  1010. ps := pullBlockState{
  1011. sharedPullerState: state.sharedPullerState,
  1012. block: block,
  1013. }
  1014. pullChan <- ps
  1015. } else {
  1016. state.copyDone()
  1017. }
  1018. }
  1019. out <- state.sharedPullerState
  1020. }
  1021. }
  1022. func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  1023. for state := range in {
  1024. if state.failed() != nil {
  1025. out <- state.sharedPullerState
  1026. continue
  1027. }
  1028. // Get an fd to the temporary file. Technically we don't need it until
  1029. // after fetching the block, but if we run into an error here there is
  1030. // no point in issuing the request to the network.
  1031. fd, err := state.tempFile()
  1032. if err != nil {
  1033. out <- state.sharedPullerState
  1034. continue
  1035. }
  1036. var lastError error
  1037. potentialDevices := p.model.Availability(p.folder, state.file.Name)
  1038. for {
  1039. // Select the least busy device to pull the block from. If we found no
  1040. // feasible device at all, fail the block (and in the long run, the
  1041. // file).
  1042. selected := activity.leastBusy(potentialDevices)
  1043. if selected == (protocol.DeviceID{}) {
  1044. if lastError != nil {
  1045. state.fail("pull", lastError)
  1046. } else {
  1047. state.fail("pull", errNoDevice)
  1048. }
  1049. break
  1050. }
  1051. potentialDevices = removeDevice(potentialDevices, selected)
  1052. // Fetch the block, while marking the selected device as in use so that
  1053. // leastBusy can select another device when someone else asks.
  1054. activity.using(selected)
  1055. buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, 0, nil)
  1056. activity.done(selected)
  1057. if lastError != nil {
  1058. if debug {
  1059. l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
  1060. }
  1061. continue
  1062. }
  1063. // Verify that the received block matches the desired hash, if not
  1064. // try pulling it from another device.
  1065. _, lastError = scanner.VerifyBuffer(buf, state.block)
  1066. if lastError != nil {
  1067. if debug {
  1068. l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
  1069. }
  1070. continue
  1071. }
  1072. // Save the block data we got from the cluster
  1073. _, err = fd.WriteAt(buf, state.block.Offset)
  1074. if err != nil {
  1075. state.fail("save", err)
  1076. } else {
  1077. state.pullDone()
  1078. }
  1079. break
  1080. }
  1081. out <- state.sharedPullerState
  1082. }
  1083. }
  1084. func (p *rwFolder) performFinish(state *sharedPullerState) error {
  1085. // Set the correct permission bits on the new file
  1086. if !p.ignorePermissions(state.file) {
  1087. if err := os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)); err != nil {
  1088. return err
  1089. }
  1090. }
  1091. // Set the correct timestamp on the new file
  1092. t := time.Unix(state.file.Modified, 0)
  1093. if err := os.Chtimes(state.tempName, t, t); err != nil {
  1094. // Try using virtual mtimes instead
  1095. info, err := os.Stat(state.tempName)
  1096. if err != nil {
  1097. return err
  1098. }
  1099. p.virtualMtimeRepo.UpdateMtime(state.file.Name, info.ModTime(), t)
  1100. }
  1101. if stat, err := osutil.Lstat(state.realName); err == nil {
  1102. // There is an old file or directory already in place. We need to
  1103. // handle that.
  1104. switch {
  1105. case stat.IsDir() || stat.Mode()&os.ModeSymlink != 0:
  1106. // It's a directory or a symlink. These are not versioned or
  1107. // archived for conflicts, only removed (which of course fails for
  1108. // non-empty directories).
  1109. // TODO: This is the place where we want to remove temporary files
  1110. // and future hard ignores before attempting a directory delete.
  1111. // Should share code with p.deletDir().
  1112. if err = osutil.InWritableDir(osutil.Remove, state.realName); err != nil {
  1113. return err
  1114. }
  1115. case p.inConflict(state.version, state.file.Version):
  1116. // The new file has been changed in conflict with the existing one. We
  1117. // should file it away as a conflict instead of just removing or
  1118. // archiving. Also merge with the version vector we had, to indicate
  1119. // we have resolved the conflict.
  1120. state.file.Version = state.file.Version.Merge(state.version)
  1121. if err = osutil.InWritableDir(moveForConflict, state.realName); err != nil {
  1122. return err
  1123. }
  1124. case p.versioner != nil:
  1125. // If we should use versioning, let the versioner archive the old
  1126. // file before we replace it. Archiving a non-existent file is not
  1127. // an error.
  1128. if err = p.versioner.Archive(state.realName); err != nil {
  1129. return err
  1130. }
  1131. }
  1132. }
  1133. // Replace the original content with the new one
  1134. if err := osutil.Rename(state.tempName, state.realName); err != nil {
  1135. return err
  1136. }
  1137. // If it's a symlink, the target of the symlink is inside the file.
  1138. if state.file.IsSymlink() {
  1139. content, err := ioutil.ReadFile(state.realName)
  1140. if err != nil {
  1141. return err
  1142. }
  1143. // Remove the file, and replace it with a symlink.
  1144. err = osutil.InWritableDir(func(path string) error {
  1145. os.Remove(path)
  1146. tt := symlinks.TargetFile
  1147. if state.file.IsDirectory() {
  1148. tt = symlinks.TargetDirectory
  1149. }
  1150. return symlinks.Create(path, string(content), tt)
  1151. }, state.realName)
  1152. if err != nil {
  1153. return err
  1154. }
  1155. }
  1156. // Record the updated file in the index
  1157. p.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile}
  1158. return nil
  1159. }
  1160. func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
  1161. for state := range in {
  1162. if closed, err := state.finalClose(); closed {
  1163. if debug {
  1164. l.Debugln(p, "closing", state.file.Name)
  1165. }
  1166. p.queue.Done(state.file.Name)
  1167. if err == nil {
  1168. err = p.performFinish(state)
  1169. }
  1170. if err != nil {
  1171. l.Infoln("Puller: final:", err)
  1172. p.newError(state.file.Name, err)
  1173. }
  1174. events.Default.Log(events.ItemFinished, map[string]interface{}{
  1175. "folder": p.folder,
  1176. "item": state.file.Name,
  1177. "error": events.Error(err),
  1178. "type": "file",
  1179. "action": "update",
  1180. })
  1181. if p.progressEmitter != nil {
  1182. p.progressEmitter.Deregister(state)
  1183. }
  1184. }
  1185. }
  1186. }
  1187. // Moves the given filename to the front of the job queue
  1188. func (p *rwFolder) BringToFront(filename string) {
  1189. p.queue.BringToFront(filename)
  1190. }
  1191. func (p *rwFolder) Jobs() ([]string, []string) {
  1192. return p.queue.Jobs()
  1193. }
  1194. func (p *rwFolder) DelayScan(next time.Duration) {
  1195. p.delayScan <- next
  1196. }
  1197. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1198. // larger than 1000 items, and no more delayed than 2 seconds.
  1199. func (p *rwFolder) dbUpdaterRoutine() {
  1200. const (
  1201. maxBatchSize = 1000
  1202. maxBatchTime = 2 * time.Second
  1203. )
  1204. batch := make([]dbUpdateJob, 0, maxBatchSize)
  1205. files := make([]protocol.FileInfo, 0, maxBatchSize)
  1206. tick := time.NewTicker(maxBatchTime)
  1207. defer tick.Stop()
  1208. handleBatch := func() {
  1209. found := false
  1210. var lastFile protocol.FileInfo
  1211. for _, job := range batch {
  1212. files = append(files, job.file)
  1213. if job.file.IsInvalid() || (job.file.IsDirectory() && !job.file.IsSymlink()) {
  1214. continue
  1215. }
  1216. if job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) == 0 {
  1217. continue
  1218. }
  1219. found = true
  1220. lastFile = job.file
  1221. }
  1222. p.model.updateLocals(p.folder, files)
  1223. if found {
  1224. p.model.receivedFile(p.folder, lastFile)
  1225. }
  1226. batch = batch[:0]
  1227. files = files[:0]
  1228. }
  1229. loop:
  1230. for {
  1231. select {
  1232. case job, ok := <-p.dbUpdates:
  1233. if !ok {
  1234. break loop
  1235. }
  1236. job.file.LocalVersion = 0
  1237. batch = append(batch, job)
  1238. if len(batch) == maxBatchSize {
  1239. handleBatch()
  1240. }
  1241. case <-tick.C:
  1242. if len(batch) > 0 {
  1243. handleBatch()
  1244. }
  1245. }
  1246. }
  1247. if len(batch) > 0 {
  1248. handleBatch()
  1249. }
  1250. }
  1251. func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool {
  1252. if current.Concurrent(replacement) {
  1253. // Obvious case
  1254. return true
  1255. }
  1256. if replacement.Counter(p.shortID) > current.Counter(p.shortID) {
  1257. // The replacement file contains a higher version for ourselves than
  1258. // what we have. This isn't supposed to be possible, since it's only
  1259. // we who can increment that counter. We take it as a sign that
  1260. // something is wrong (our index may have been corrupted or removed)
  1261. // and flag it as a conflict.
  1262. return true
  1263. }
  1264. return false
  1265. }
  1266. func invalidateFolder(cfg *config.Configuration, folderID string, err error) {
  1267. for i := range cfg.Folders {
  1268. folder := &cfg.Folders[i]
  1269. if folder.ID == folderID {
  1270. folder.Invalid = err.Error()
  1271. return
  1272. }
  1273. }
  1274. }
  1275. func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID {
  1276. for i := range devices {
  1277. if devices[i] == device {
  1278. devices[i] = devices[len(devices)-1]
  1279. return devices[:len(devices)-1]
  1280. }
  1281. }
  1282. return devices
  1283. }
  1284. func moveForConflict(name string) error {
  1285. ext := filepath.Ext(name)
  1286. withoutExt := name[:len(name)-len(ext)]
  1287. newName := withoutExt + time.Now().Format(".sync-conflict-20060102-150405") + ext
  1288. err := os.Rename(name, newName)
  1289. if os.IsNotExist(err) {
  1290. // We were supposed to move a file away but it does not exist. Either
  1291. // the user has already moved it away, or the conflict was between a
  1292. // remote modification and a local delete. In either way it does not
  1293. // matter, go ahead as if the move succeeded.
  1294. return nil
  1295. }
  1296. return err
  1297. }
  1298. func (p *rwFolder) newError(path string, err error) {
  1299. p.errorsMut.Lock()
  1300. defer p.errorsMut.Unlock()
  1301. // We might get more than one error report for a file (i.e. error on
  1302. // Write() followed by Close()); we keep the first error as that is
  1303. // probably closer to the root cause.
  1304. if _, ok := p.errors[path]; ok {
  1305. return
  1306. }
  1307. p.errors[path] = err.Error()
  1308. }
  1309. func (p *rwFolder) clearErrors() {
  1310. p.errorsMut.Lock()
  1311. p.errors = make(map[string]string)
  1312. p.errorsMut.Unlock()
  1313. }
  1314. func (p *rwFolder) currentErrors() []fileError {
  1315. p.errorsMut.Lock()
  1316. errors := make([]fileError, 0, len(p.errors))
  1317. for path, err := range p.errors {
  1318. errors = append(errors, fileError{path, err})
  1319. }
  1320. sort.Sort(fileErrorList(errors))
  1321. p.errorsMut.Unlock()
  1322. return errors
  1323. }
  1324. // A []fileError is sent as part of an event and will be JSON serialized.
  1325. type fileError struct {
  1326. Path string `json:"path"`
  1327. Err string `json:"error"`
  1328. }
  1329. type fileErrorList []fileError
  1330. func (l fileErrorList) Len() int {
  1331. return len(l)
  1332. }
  1333. func (l fileErrorList) Less(a, b int) bool {
  1334. return l[a].Path < l[b].Path
  1335. }
  1336. func (l fileErrorList) Swap(a, b int) {
  1337. l[a], l[b] = l[b], l[a]
  1338. }