rwfolder.go 46 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "os"
  13. "path/filepath"
  14. "sort"
  15. "strings"
  16. "time"
  17. "github.com/syncthing/syncthing/lib/config"
  18. "github.com/syncthing/syncthing/lib/db"
  19. "github.com/syncthing/syncthing/lib/events"
  20. "github.com/syncthing/syncthing/lib/ignore"
  21. "github.com/syncthing/syncthing/lib/osutil"
  22. "github.com/syncthing/syncthing/lib/protocol"
  23. "github.com/syncthing/syncthing/lib/scanner"
  24. "github.com/syncthing/syncthing/lib/symlinks"
  25. "github.com/syncthing/syncthing/lib/sync"
  26. "github.com/syncthing/syncthing/lib/versioner"
  27. )
  28. // TODO: Stop on errors
  29. // A pullBlockState is passed to the puller routine for each block that needs
  30. // to be fetched.
  31. type pullBlockState struct {
  32. *sharedPullerState
  33. block protocol.BlockInfo
  34. }
  35. // A copyBlocksState is passed to copy routine if the file has blocks to be
  36. // copied.
  37. type copyBlocksState struct {
  38. *sharedPullerState
  39. blocks []protocol.BlockInfo
  40. }
  41. // Which filemode bits to preserve
  42. const retainBits = os.ModeSetgid | os.ModeSetuid | os.ModeSticky
  43. var (
  44. activity = newDeviceActivity()
  45. errNoDevice = errors.New("peers who had this file went away, or the file has changed while syncing. will retry later")
  46. )
  47. const (
  48. dbUpdateHandleDir = iota
  49. dbUpdateDeleteDir
  50. dbUpdateHandleFile
  51. dbUpdateDeleteFile
  52. dbUpdateShortcutFile
  53. )
  54. const (
  55. defaultCopiers = 1
  56. defaultPullers = 16
  57. defaultPullerSleep = 10 * time.Second
  58. defaultPullerPause = 60 * time.Second
  59. )
  60. type dbUpdateJob struct {
  61. file protocol.FileInfo
  62. jobType int
  63. }
  64. type rwFolder struct {
  65. stateTracker
  66. model *Model
  67. progressEmitter *ProgressEmitter
  68. virtualMtimeRepo *db.VirtualMtimeRepo
  69. folder string
  70. dir string
  71. scanIntv time.Duration
  72. versioner versioner.Versioner
  73. ignorePerms bool
  74. copiers int
  75. pullers int
  76. shortID protocol.ShortID
  77. order config.PullOrder
  78. maxConflicts int
  79. sleep time.Duration
  80. pause time.Duration
  81. allowSparse bool
  82. checkFreeSpace bool
  83. stop chan struct{}
  84. queue *jobQueue
  85. dbUpdates chan dbUpdateJob
  86. scanTimer *time.Timer
  87. pullTimer *time.Timer
  88. delayScan chan time.Duration
  89. scanNow chan rescanRequest
  90. remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
  91. errors map[string]string // path -> error string
  92. errorsMut sync.Mutex
  93. }
  94. func newRWFolder(m *Model, shortID protocol.ShortID, cfg config.FolderConfiguration) *rwFolder {
  95. p := &rwFolder{
  96. stateTracker: stateTracker{
  97. folder: cfg.ID,
  98. mut: sync.NewMutex(),
  99. },
  100. model: m,
  101. progressEmitter: m.progressEmitter,
  102. virtualMtimeRepo: db.NewVirtualMtimeRepo(m.db, cfg.ID),
  103. folder: cfg.ID,
  104. dir: cfg.Path(),
  105. scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
  106. ignorePerms: cfg.IgnorePerms,
  107. copiers: cfg.Copiers,
  108. pullers: cfg.Pullers,
  109. shortID: shortID,
  110. order: cfg.Order,
  111. maxConflicts: cfg.MaxConflicts,
  112. allowSparse: !cfg.DisableSparseFiles,
  113. checkFreeSpace: cfg.MinDiskFreePct != 0,
  114. stop: make(chan struct{}),
  115. queue: newJobQueue(),
  116. pullTimer: time.NewTimer(time.Second),
  117. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
  118. delayScan: make(chan time.Duration),
  119. scanNow: make(chan rescanRequest),
  120. remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
  121. errorsMut: sync.NewMutex(),
  122. }
  123. if p.copiers == 0 {
  124. p.copiers = defaultCopiers
  125. }
  126. if p.pullers == 0 {
  127. p.pullers = defaultPullers
  128. }
  129. if cfg.PullerPauseS == 0 {
  130. p.pause = defaultPullerPause
  131. } else {
  132. p.pause = time.Duration(cfg.PullerPauseS) * time.Second
  133. }
  134. if cfg.PullerSleepS == 0 {
  135. p.sleep = defaultPullerSleep
  136. } else {
  137. p.sleep = time.Duration(cfg.PullerSleepS) * time.Second
  138. }
  139. return p
  140. }
  141. // Helper function to check whether either the ignorePerm flag has been
  142. // set on the local host or the FlagNoPermBits has been set on the file/dir
  143. // which is being pulled.
  144. func (p *rwFolder) ignorePermissions(file protocol.FileInfo) bool {
  145. return p.ignorePerms || file.Flags&protocol.FlagNoPermBits != 0
  146. }
  147. // Serve will run scans and pulls. It will return when Stop()ed or on a
  148. // critical error.
  149. func (p *rwFolder) Serve() {
  150. l.Debugln(p, "starting")
  151. defer l.Debugln(p, "exiting")
  152. defer func() {
  153. p.pullTimer.Stop()
  154. p.scanTimer.Stop()
  155. // TODO: Should there be an actual FolderStopped state?
  156. p.setState(FolderIdle)
  157. }()
  158. var prevVer int64
  159. var prevIgnoreHash string
  160. rescheduleScan := func() {
  161. if p.scanIntv == 0 {
  162. // We should not run scans, so it should not be rescheduled.
  163. return
  164. }
  165. // Sleep a random time between 3/4 and 5/4 of the configured interval.
  166. sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
  167. intv := time.Duration(sleepNanos) * time.Nanosecond
  168. l.Debugln(p, "next rescan in", intv)
  169. p.scanTimer.Reset(intv)
  170. }
  171. // We don't start pulling files until a scan has been completed.
  172. initialScanCompleted := false
  173. for {
  174. select {
  175. case <-p.stop:
  176. return
  177. case <-p.remoteIndex:
  178. prevVer = 0
  179. p.pullTimer.Reset(0)
  180. l.Debugln(p, "remote index updated, rescheduling pull")
  181. case <-p.pullTimer.C:
  182. if !initialScanCompleted {
  183. l.Debugln(p, "skip (initial)")
  184. p.pullTimer.Reset(p.sleep)
  185. continue
  186. }
  187. p.model.fmut.RLock()
  188. curIgnores := p.model.folderIgnores[p.folder]
  189. p.model.fmut.RUnlock()
  190. if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
  191. // The ignore patterns have changed. We need to re-evaluate if
  192. // there are files we need now that were ignored before.
  193. l.Debugln(p, "ignore patterns have changed, resetting prevVer")
  194. prevVer = 0
  195. prevIgnoreHash = newHash
  196. }
  197. // RemoteLocalVersion() is a fast call, doesn't touch the database.
  198. curVer, ok := p.model.RemoteLocalVersion(p.folder)
  199. if !ok || curVer == prevVer {
  200. l.Debugln(p, "skip (curVer == prevVer)", prevVer, ok)
  201. p.pullTimer.Reset(p.sleep)
  202. continue
  203. }
  204. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  205. l.Infoln("Skipping folder", p.folder, "pull due to folder error:", err)
  206. p.pullTimer.Reset(p.sleep)
  207. continue
  208. }
  209. l.Debugln(p, "pulling", prevVer, curVer)
  210. p.setState(FolderSyncing)
  211. p.clearErrors()
  212. tries := 0
  213. for {
  214. tries++
  215. changed := p.pullerIteration(curIgnores)
  216. l.Debugln(p, "changed", changed)
  217. if changed == 0 {
  218. // No files were changed by the puller, so we are in
  219. // sync. Remember the local version number and
  220. // schedule a resync a little bit into the future.
  221. if lv, ok := p.model.RemoteLocalVersion(p.folder); ok && lv < curVer {
  222. // There's a corner case where the device we needed
  223. // files from disconnected during the puller
  224. // iteration. The files will have been removed from
  225. // the index, so we've concluded that we don't need
  226. // them, but at the same time we have the local
  227. // version that includes those files in curVer. So we
  228. // catch the case that localVersion might have
  229. // decreased here.
  230. l.Debugln(p, "adjusting curVer", lv)
  231. curVer = lv
  232. }
  233. prevVer = curVer
  234. l.Debugln(p, "next pull in", p.sleep)
  235. p.pullTimer.Reset(p.sleep)
  236. break
  237. }
  238. if tries > 10 {
  239. // We've tried a bunch of times to get in sync, but
  240. // we're not making it. Probably there are write
  241. // errors preventing us. Flag this with a warning and
  242. // wait a bit longer before retrying.
  243. l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, p.pause)
  244. l.Debugln(p, "next pull in", p.pause)
  245. if folderErrors := p.currentErrors(); len(folderErrors) > 0 {
  246. events.Default.Log(events.FolderErrors, map[string]interface{}{
  247. "folder": p.folder,
  248. "errors": folderErrors,
  249. })
  250. }
  251. p.pullTimer.Reset(p.pause)
  252. break
  253. }
  254. }
  255. p.setState(FolderIdle)
  256. // The reason for running the scanner from within the puller is that
  257. // this is the easiest way to make sure we are not doing both at the
  258. // same time.
  259. case <-p.scanTimer.C:
  260. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  261. l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
  262. rescheduleScan()
  263. continue
  264. }
  265. l.Debugln(p, "rescan")
  266. if err := p.model.internalScanFolderSubs(p.folder, nil); err != nil {
  267. // Potentially sets the error twice, once in the scanner just
  268. // by doing a check, and once here, if the error returned is
  269. // the same one as returned by CheckFolderHealth, though
  270. // duplicate set is handled by setError.
  271. p.setError(err)
  272. rescheduleScan()
  273. continue
  274. }
  275. if p.scanIntv > 0 {
  276. rescheduleScan()
  277. }
  278. if !initialScanCompleted {
  279. l.Infoln("Completed initial scan (rw) of folder", p.folder)
  280. initialScanCompleted = true
  281. }
  282. case req := <-p.scanNow:
  283. if err := p.model.CheckFolderHealth(p.folder); err != nil {
  284. l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err)
  285. req.err <- err
  286. continue
  287. }
  288. l.Debugln(p, "forced rescan")
  289. if err := p.model.internalScanFolderSubs(p.folder, req.subs); err != nil {
  290. // Potentially sets the error twice, once in the scanner just
  291. // by doing a check, and once here, if the error returned is
  292. // the same one as returned by CheckFolderHealth, though
  293. // duplicate set is handled by setError.
  294. p.setError(err)
  295. req.err <- err
  296. continue
  297. }
  298. req.err <- nil
  299. case next := <-p.delayScan:
  300. p.scanTimer.Reset(next)
  301. }
  302. }
  303. }
  304. func (p *rwFolder) Stop() {
  305. close(p.stop)
  306. }
  307. func (p *rwFolder) IndexUpdated() {
  308. select {
  309. case p.remoteIndex <- struct{}{}:
  310. default:
  311. // We might be busy doing a pull and thus not reading from this
  312. // channel. The channel is 1-buffered, so one notification will be
  313. // queued to ensure we recheck after the pull, but beyond that we must
  314. // make sure to not block index receiving.
  315. }
  316. }
  317. func (p *rwFolder) Scan(subs []string) error {
  318. req := rescanRequest{
  319. subs: subs,
  320. err: make(chan error),
  321. }
  322. p.scanNow <- req
  323. return <-req.err
  324. }
  325. func (p *rwFolder) String() string {
  326. return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
  327. }
  328. // pullerIteration runs a single puller iteration for the given folder and
  329. // returns the number items that should have been synced (even those that
  330. // might have failed). One puller iteration handles all files currently
  331. // flagged as needed in the folder.
  332. func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
  333. pullChan := make(chan pullBlockState)
  334. copyChan := make(chan copyBlocksState)
  335. finisherChan := make(chan *sharedPullerState)
  336. updateWg := sync.NewWaitGroup()
  337. copyWg := sync.NewWaitGroup()
  338. pullWg := sync.NewWaitGroup()
  339. doneWg := sync.NewWaitGroup()
  340. l.Debugln(p, "c", p.copiers, "p", p.pullers)
  341. p.dbUpdates = make(chan dbUpdateJob)
  342. updateWg.Add(1)
  343. go func() {
  344. // dbUpdaterRoutine finishes when p.dbUpdates is closed
  345. p.dbUpdaterRoutine()
  346. updateWg.Done()
  347. }()
  348. for i := 0; i < p.copiers; i++ {
  349. copyWg.Add(1)
  350. go func() {
  351. // copierRoutine finishes when copyChan is closed
  352. p.copierRoutine(copyChan, pullChan, finisherChan)
  353. copyWg.Done()
  354. }()
  355. }
  356. for i := 0; i < p.pullers; i++ {
  357. pullWg.Add(1)
  358. go func() {
  359. // pullerRoutine finishes when pullChan is closed
  360. p.pullerRoutine(pullChan, finisherChan)
  361. pullWg.Done()
  362. }()
  363. }
  364. doneWg.Add(1)
  365. // finisherRoutine finishes when finisherChan is closed
  366. go func() {
  367. p.finisherRoutine(finisherChan)
  368. doneWg.Done()
  369. }()
  370. p.model.fmut.RLock()
  371. folderFiles := p.model.folderFiles[p.folder]
  372. p.model.fmut.RUnlock()
  373. // !!!
  374. // WithNeed takes a database snapshot (by necessity). By the time we've
  375. // handled a bunch of files it might have become out of date and we might
  376. // be attempting to sync with an old version of a file...
  377. // !!!
  378. changed := 0
  379. fileDeletions := map[string]protocol.FileInfo{}
  380. dirDeletions := []protocol.FileInfo{}
  381. buckets := map[string][]protocol.FileInfo{}
  382. handleFile := func(f protocol.FileInfo) bool {
  383. switch {
  384. case f.IsDeleted():
  385. // A deleted file, directory or symlink
  386. if f.IsDirectory() {
  387. dirDeletions = append(dirDeletions, f)
  388. } else {
  389. fileDeletions[f.Name] = f
  390. df, ok := p.model.CurrentFolderFile(p.folder, f.Name)
  391. // Local file can be already deleted, but with a lower version
  392. // number, hence the deletion coming in again as part of
  393. // WithNeed, furthermore, the file can simply be of the wrong
  394. // type if we haven't yet managed to pull it.
  395. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
  396. // Put files into buckets per first hash
  397. key := string(df.Blocks[0].Hash)
  398. buckets[key] = append(buckets[key], df)
  399. }
  400. }
  401. case f.IsDirectory() && !f.IsSymlink():
  402. // A new or changed directory
  403. l.Debugln("Creating directory", f.Name)
  404. p.handleDir(f)
  405. default:
  406. return false
  407. }
  408. return true
  409. }
  410. folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
  411. // Needed items are delivered sorted lexicographically. We'll handle
  412. // directories as they come along, so parents before children. Files
  413. // are queued and the order may be changed later.
  414. file := intf.(protocol.FileInfo)
  415. if ignores.Match(file.Name) {
  416. // This is an ignored file. Skip it, continue iteration.
  417. return true
  418. }
  419. l.Debugln(p, "handling", file.Name)
  420. if !handleFile(file) {
  421. // A new or changed file or symlink. This is the only case where we
  422. // do stuff concurrently in the background
  423. p.queue.Push(file.Name, file.Size(), file.Modified)
  424. }
  425. changed++
  426. return true
  427. })
  428. // Reorder the file queue according to configuration
  429. switch p.order {
  430. case config.OrderRandom:
  431. p.queue.Shuffle()
  432. case config.OrderAlphabetic:
  433. // The queue is already in alphabetic order.
  434. case config.OrderSmallestFirst:
  435. p.queue.SortSmallestFirst()
  436. case config.OrderLargestFirst:
  437. p.queue.SortLargestFirst()
  438. case config.OrderOldestFirst:
  439. p.queue.SortOldestFirst()
  440. case config.OrderNewestFirst:
  441. p.queue.SortNewestFirst()
  442. }
  443. // Process the file queue
  444. nextFile:
  445. for {
  446. select {
  447. case <-p.stop:
  448. // Stop processing files if the puller has been told to stop.
  449. break
  450. default:
  451. }
  452. fileName, ok := p.queue.Pop()
  453. if !ok {
  454. break
  455. }
  456. f, ok := p.model.CurrentGlobalFile(p.folder, fileName)
  457. if !ok {
  458. // File is no longer in the index. Mark it as done and drop it.
  459. p.queue.Done(fileName)
  460. continue
  461. }
  462. // Handles races where an index update arrives changing what the file
  463. // is between queueing and retrieving it from the queue, effectively
  464. // changing how the file should be handled.
  465. if handleFile(f) {
  466. continue
  467. }
  468. if !f.IsSymlink() {
  469. key := string(f.Blocks[0].Hash)
  470. for i, candidate := range buckets[key] {
  471. if scanner.BlocksEqual(candidate.Blocks, f.Blocks) {
  472. // Remove the candidate from the bucket
  473. lidx := len(buckets[key]) - 1
  474. buckets[key][i] = buckets[key][lidx]
  475. buckets[key] = buckets[key][:lidx]
  476. // candidate is our current state of the file, where as the
  477. // desired state with the delete bit set is in the deletion
  478. // map.
  479. desired := fileDeletions[candidate.Name]
  480. // Remove the pending deletion (as we perform it by renaming)
  481. delete(fileDeletions, candidate.Name)
  482. p.renameFile(desired, f)
  483. p.queue.Done(fileName)
  484. continue nextFile
  485. }
  486. }
  487. }
  488. // Not a rename or a symlink, deal with it.
  489. p.handleFile(f, copyChan, finisherChan)
  490. }
  491. // Signal copy and puller routines that we are done with the in data for
  492. // this iteration. Wait for them to finish.
  493. close(copyChan)
  494. copyWg.Wait()
  495. close(pullChan)
  496. pullWg.Wait()
  497. // Signal the finisher chan that there will be no more input.
  498. close(finisherChan)
  499. // Wait for the finisherChan to finish.
  500. doneWg.Wait()
  501. for _, file := range fileDeletions {
  502. l.Debugln("Deleting file", file.Name)
  503. p.deleteFile(file)
  504. }
  505. for i := range dirDeletions {
  506. dir := dirDeletions[len(dirDeletions)-i-1]
  507. l.Debugln("Deleting dir", dir.Name)
  508. p.deleteDir(dir)
  509. }
  510. // Wait for db updates to complete
  511. close(p.dbUpdates)
  512. updateWg.Wait()
  513. return changed
  514. }
  515. // handleDir creates or updates the given directory
  516. func (p *rwFolder) handleDir(file protocol.FileInfo) {
  517. var err error
  518. events.Default.Log(events.ItemStarted, map[string]string{
  519. "folder": p.folder,
  520. "item": file.Name,
  521. "type": "dir",
  522. "action": "update",
  523. })
  524. defer func() {
  525. events.Default.Log(events.ItemFinished, map[string]interface{}{
  526. "folder": p.folder,
  527. "item": file.Name,
  528. "error": events.Error(err),
  529. "type": "dir",
  530. "action": "update",
  531. })
  532. }()
  533. realName := filepath.Join(p.dir, file.Name)
  534. mode := os.FileMode(file.Flags & 0777)
  535. if p.ignorePermissions(file) {
  536. mode = 0777
  537. }
  538. if shouldDebug() {
  539. curFile, _ := p.model.CurrentFolderFile(p.folder, file.Name)
  540. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  541. }
  542. info, err := osutil.Lstat(realName)
  543. switch {
  544. // There is already something under that name, but it's a file/link.
  545. // Most likely a file/link is getting replaced with a directory.
  546. // Remove the file/link and fall through to directory creation.
  547. case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
  548. err = osutil.InWritableDir(osutil.Remove, realName)
  549. if err != nil {
  550. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  551. p.newError(file.Name, err)
  552. return
  553. }
  554. fallthrough
  555. // The directory doesn't exist, so we create it with the right
  556. // mode bits from the start.
  557. case err != nil && os.IsNotExist(err):
  558. // We declare a function that acts on only the path name, so
  559. // we can pass it to InWritableDir. We use a regular Mkdir and
  560. // not MkdirAll because the parent should already exist.
  561. mkdir := func(path string) error {
  562. err = os.Mkdir(path, mode)
  563. if err != nil || p.ignorePermissions(file) {
  564. return err
  565. }
  566. // Stat the directory so we can check its permissions.
  567. info, err := osutil.Lstat(path)
  568. if err != nil {
  569. return err
  570. }
  571. // Mask for the bits we want to preserve and add them in to the
  572. // directories permissions.
  573. return os.Chmod(path, mode|(info.Mode()&retainBits))
  574. }
  575. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  576. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  577. } else {
  578. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  579. p.newError(file.Name, err)
  580. }
  581. return
  582. // Weird error when stat()'ing the dir. Probably won't work to do
  583. // anything else with it if we can't even stat() it.
  584. case err != nil:
  585. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  586. p.newError(file.Name, err)
  587. return
  588. }
  589. // The directory already exists, so we just correct the mode bits. (We
  590. // don't handle modification times on directories, because that sucks...)
  591. // It's OK to change mode bits on stuff within non-writable directories.
  592. if p.ignorePermissions(file) {
  593. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  594. } else if err := os.Chmod(realName, mode|(info.Mode()&retainBits)); err == nil {
  595. p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  596. } else {
  597. l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err)
  598. p.newError(file.Name, err)
  599. }
  600. }
  601. // deleteDir attempts to delete the given directory
  602. func (p *rwFolder) deleteDir(file protocol.FileInfo) {
  603. var err error
  604. events.Default.Log(events.ItemStarted, map[string]string{
  605. "folder": p.folder,
  606. "item": file.Name,
  607. "type": "dir",
  608. "action": "delete",
  609. })
  610. defer func() {
  611. events.Default.Log(events.ItemFinished, map[string]interface{}{
  612. "folder": p.folder,
  613. "item": file.Name,
  614. "error": events.Error(err),
  615. "type": "dir",
  616. "action": "delete",
  617. })
  618. }()
  619. realName := filepath.Join(p.dir, file.Name)
  620. // Delete any temporary files lying around in the directory
  621. dir, _ := os.Open(realName)
  622. if dir != nil {
  623. files, _ := dir.Readdirnames(-1)
  624. for _, file := range files {
  625. if defTempNamer.IsTemporary(file) {
  626. osutil.InWritableDir(osutil.Remove, filepath.Join(realName, file))
  627. }
  628. }
  629. dir.Close()
  630. }
  631. err = osutil.InWritableDir(osutil.Remove, realName)
  632. if err == nil || os.IsNotExist(err) {
  633. // It was removed or it doesn't exist to start with
  634. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  635. } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  636. // We get an error just looking at the directory, and it's not a
  637. // permission problem. Lets assume the error is in fact some variant
  638. // of "file does not exist" (possibly expressed as some parent being a
  639. // file and not a directory etc) and that the delete is handled.
  640. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  641. } else {
  642. l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err)
  643. p.newError(file.Name, err)
  644. }
  645. }
  646. // deleteFile attempts to delete the given file
  647. func (p *rwFolder) deleteFile(file protocol.FileInfo) {
  648. var err error
  649. events.Default.Log(events.ItemStarted, map[string]string{
  650. "folder": p.folder,
  651. "item": file.Name,
  652. "type": "file",
  653. "action": "delete",
  654. })
  655. defer func() {
  656. events.Default.Log(events.ItemFinished, map[string]interface{}{
  657. "folder": p.folder,
  658. "item": file.Name,
  659. "error": events.Error(err),
  660. "type": "file",
  661. "action": "delete",
  662. })
  663. }()
  664. realName := filepath.Join(p.dir, file.Name)
  665. cur, ok := p.model.CurrentFolderFile(p.folder, file.Name)
  666. if ok && p.inConflict(cur.Version, file.Version) {
  667. // There is a conflict here. Move the file to a conflict copy instead
  668. // of deleting. Also merge with the version vector we had, to indicate
  669. // we have resolved the conflict.
  670. file.Version = file.Version.Merge(cur.Version)
  671. err = osutil.InWritableDir(p.moveForConflict, realName)
  672. } else if p.versioner != nil {
  673. err = osutil.InWritableDir(p.versioner.Archive, realName)
  674. } else {
  675. err = osutil.InWritableDir(osutil.Remove, realName)
  676. }
  677. if err == nil || os.IsNotExist(err) {
  678. // It was removed or it doesn't exist to start with
  679. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  680. } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  681. // We get an error just looking at the file, and it's not a permission
  682. // problem. Lets assume the error is in fact some variant of "file
  683. // does not exist" (possibly expressed as some parent being a file and
  684. // not a directory etc) and that the delete is handled.
  685. p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  686. } else {
  687. l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err)
  688. p.newError(file.Name, err)
  689. }
  690. }
  691. // renameFile attempts to rename an existing file to a destination
  692. // and set the right attributes on it.
  693. func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
  694. var err error
  695. events.Default.Log(events.ItemStarted, map[string]string{
  696. "folder": p.folder,
  697. "item": source.Name,
  698. "type": "file",
  699. "action": "delete",
  700. })
  701. events.Default.Log(events.ItemStarted, map[string]string{
  702. "folder": p.folder,
  703. "item": target.Name,
  704. "type": "file",
  705. "action": "update",
  706. })
  707. defer func() {
  708. events.Default.Log(events.ItemFinished, map[string]interface{}{
  709. "folder": p.folder,
  710. "item": source.Name,
  711. "error": events.Error(err),
  712. "type": "file",
  713. "action": "delete",
  714. })
  715. events.Default.Log(events.ItemFinished, map[string]interface{}{
  716. "folder": p.folder,
  717. "item": target.Name,
  718. "error": events.Error(err),
  719. "type": "file",
  720. "action": "update",
  721. })
  722. }()
  723. l.Debugln(p, "taking rename shortcut", source.Name, "->", target.Name)
  724. from := filepath.Join(p.dir, source.Name)
  725. to := filepath.Join(p.dir, target.Name)
  726. if p.versioner != nil {
  727. err = osutil.Copy(from, to)
  728. if err == nil {
  729. err = osutil.InWritableDir(p.versioner.Archive, from)
  730. }
  731. } else {
  732. err = osutil.TryRename(from, to)
  733. }
  734. if err == nil {
  735. // The file was renamed, so we have handled both the necessary delete
  736. // of the source and the creation of the target. Fix-up the metadata,
  737. // and update the local index of the target file.
  738. p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  739. err = p.shortcutFile(target)
  740. if err != nil {
  741. l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", p.folder, target.Name, source.Name, err)
  742. p.newError(target.Name, err)
  743. return
  744. }
  745. p.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile}
  746. } else {
  747. // We failed the rename so we have a source file that we still need to
  748. // get rid of. Attempt to delete it instead so that we make *some*
  749. // progress. The target is unhandled.
  750. err = osutil.InWritableDir(osutil.Remove, from)
  751. if err != nil {
  752. l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", p.folder, target.Name, source.Name, err)
  753. p.newError(target.Name, err)
  754. return
  755. }
  756. p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  757. }
  758. }
  759. // This is the flow of data and events here, I think...
  760. //
  761. // +-----------------------+
  762. // | | - - - - > ItemStarted
  763. // | handleFile | - - - - > ItemFinished (on shortcuts)
  764. // | |
  765. // +-----------------------+
  766. // |
  767. // | copyChan (copyBlocksState; unless shortcut taken)
  768. // |
  769. // | +-----------------------+
  770. // | | +-----------------------+
  771. // +--->| | |
  772. // | | copierRoutine |
  773. // +-| |
  774. // +-----------------------+
  775. // |
  776. // | pullChan (sharedPullerState)
  777. // |
  778. // | +-----------------------+
  779. // | | +-----------------------+
  780. // +-->| | |
  781. // | | pullerRoutine |
  782. // +-| |
  783. // +-----------------------+
  784. // |
  785. // | finisherChan (sharedPullerState)
  786. // |
  787. // | +-----------------------+
  788. // | | |
  789. // +-->| finisherRoutine | - - - - > ItemFinished
  790. // | |
  791. // +-----------------------+
  792. // handleFile queues the copies and pulls as necessary for a single new or
  793. // changed file.
  794. func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
  795. curFile, hasCurFile := p.model.CurrentFolderFile(p.folder, file.Name)
  796. if hasCurFile && len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) {
  797. // We are supposed to copy the entire file, and then fetch nothing. We
  798. // are only updating metadata, so we don't actually *need* to make the
  799. // copy.
  800. l.Debugln(p, "taking shortcut on", file.Name)
  801. events.Default.Log(events.ItemStarted, map[string]string{
  802. "folder": p.folder,
  803. "item": file.Name,
  804. "type": "file",
  805. "action": "metadata",
  806. })
  807. p.queue.Done(file.Name)
  808. var err error
  809. if file.IsSymlink() {
  810. err = p.shortcutSymlink(file)
  811. } else {
  812. err = p.shortcutFile(file)
  813. }
  814. events.Default.Log(events.ItemFinished, map[string]interface{}{
  815. "folder": p.folder,
  816. "item": file.Name,
  817. "error": events.Error(err),
  818. "type": "file",
  819. "action": "metadata",
  820. })
  821. if err != nil {
  822. l.Infoln("Puller: shortcut:", err)
  823. p.newError(file.Name, err)
  824. } else {
  825. p.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile}
  826. }
  827. return
  828. }
  829. // Figure out the absolute filenames we need once and for all
  830. tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name))
  831. realName := filepath.Join(p.dir, file.Name)
  832. if hasCurFile && !curFile.IsDirectory() && !curFile.IsSymlink() {
  833. // Check that the file on disk is what we expect it to be according to
  834. // the database. If there's a mismatch here, there might be local
  835. // changes that we don't know about yet and we should scan before
  836. // touching the file. If we can't stat the file we'll just pull it.
  837. if info, err := osutil.Lstat(realName); err == nil {
  838. mtime := p.virtualMtimeRepo.GetMtime(file.Name, info.ModTime())
  839. if mtime.Unix() != curFile.Modified || info.Size() != curFile.Size() {
  840. l.Debugln("file modified but not rescanned; not pulling:", realName)
  841. // Scan() is synchronous (i.e. blocks until the scan is
  842. // completed and returns an error), but a scan can't happen
  843. // while we're in the puller routine. Request the scan in the
  844. // background and it'll be handled when the current pulling
  845. // sweep is complete. As we do retries, we'll queue the scan
  846. // for this file up to ten times, but the last nine of those
  847. // scans will be cheap...
  848. go p.Scan([]string{file.Name})
  849. return
  850. }
  851. }
  852. }
  853. scanner.PopulateOffsets(file.Blocks)
  854. reused := 0
  855. var blocks []protocol.BlockInfo
  856. var blocksSize int64
  857. // Check for an old temporary file which might have some blocks we could
  858. // reuse.
  859. tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize, 0, nil)
  860. if err == nil {
  861. // Check for any reusable blocks in the temp file
  862. tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
  863. // block.String() returns a string unique to the block
  864. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  865. for _, block := range tempCopyBlocks {
  866. existingBlocks[block.String()] = struct{}{}
  867. }
  868. // Since the blocks are already there, we don't need to get them.
  869. for _, block := range file.Blocks {
  870. _, ok := existingBlocks[block.String()]
  871. if !ok {
  872. blocks = append(blocks, block)
  873. blocksSize += int64(block.Size)
  874. }
  875. }
  876. // The sharedpullerstate will know which flags to use when opening the
  877. // temp file depending if we are reusing any blocks or not.
  878. reused = len(file.Blocks) - len(blocks)
  879. if reused == 0 {
  880. // Otherwise, discard the file ourselves in order for the
  881. // sharedpuller not to panic when it fails to exclusively create a
  882. // file which already exists
  883. osutil.InWritableDir(osutil.Remove, tempName)
  884. }
  885. } else {
  886. blocks = file.Blocks
  887. blocksSize = file.Size()
  888. }
  889. if p.checkFreeSpace {
  890. if free, err := osutil.DiskFreeBytes(p.dir); err == nil && free < blocksSize {
  891. l.Warnf(`Folder "%s": insufficient disk space in %s for %s: have %.2f MiB, need %.2f MiB`, p.folder, p.dir, file.Name, float64(free)/1024/1024, float64(blocksSize)/1024/1024)
  892. p.newError(file.Name, errors.New("insufficient space"))
  893. return
  894. }
  895. }
  896. events.Default.Log(events.ItemStarted, map[string]string{
  897. "folder": p.folder,
  898. "item": file.Name,
  899. "type": "file",
  900. "action": "update",
  901. })
  902. s := sharedPullerState{
  903. file: file,
  904. folder: p.folder,
  905. tempName: tempName,
  906. realName: realName,
  907. copyTotal: len(blocks),
  908. copyNeeded: len(blocks),
  909. reused: reused,
  910. ignorePerms: p.ignorePermissions(file),
  911. version: curFile.Version,
  912. mut: sync.NewMutex(),
  913. sparse: p.allowSparse,
  914. }
  915. l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused)
  916. cs := copyBlocksState{
  917. sharedPullerState: &s,
  918. blocks: blocks,
  919. }
  920. copyChan <- cs
  921. }
  922. // shortcutFile sets file mode and modification time, when that's the only
  923. // thing that has changed.
  924. func (p *rwFolder) shortcutFile(file protocol.FileInfo) error {
  925. realName := filepath.Join(p.dir, file.Name)
  926. if !p.ignorePermissions(file) {
  927. if err := os.Chmod(realName, os.FileMode(file.Flags&0777)); err != nil {
  928. l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", p.folder, file.Name, err)
  929. p.newError(file.Name, err)
  930. return err
  931. }
  932. }
  933. t := time.Unix(file.Modified, 0)
  934. if err := os.Chtimes(realName, t, t); err != nil {
  935. // Try using virtual mtimes
  936. info, err := os.Stat(realName)
  937. if err != nil {
  938. l.Infof("Puller (folder %q, file %q): shortcut: unable to stat file: %v", p.folder, file.Name, err)
  939. p.newError(file.Name, err)
  940. return err
  941. }
  942. p.virtualMtimeRepo.UpdateMtime(file.Name, info.ModTime(), t)
  943. }
  944. // This may have been a conflict. We should merge the version vectors so
  945. // that our clock doesn't move backwards.
  946. if cur, ok := p.model.CurrentFolderFile(p.folder, file.Name); ok {
  947. file.Version = file.Version.Merge(cur.Version)
  948. }
  949. return nil
  950. }
  951. // shortcutSymlink changes the symlinks type if necessary.
  952. func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
  953. tt := symlinks.TargetFile
  954. if file.IsDirectory() {
  955. tt = symlinks.TargetDirectory
  956. }
  957. err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), tt)
  958. if err != nil {
  959. l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err)
  960. p.newError(file.Name, err)
  961. }
  962. return
  963. }
  964. // copierRoutine reads copierStates until the in channel closes and performs
  965. // the relevant copies when possible, or passes it to the puller routine.
  966. func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  967. buf := make([]byte, protocol.BlockSize)
  968. for state := range in {
  969. dstFd, err := state.tempFile()
  970. if err != nil {
  971. // Nothing more to do for this failed file, since we couldn't create a temporary for it.
  972. out <- state.sharedPullerState
  973. continue
  974. }
  975. if p.progressEmitter != nil {
  976. p.progressEmitter.Register(state.sharedPullerState)
  977. }
  978. folderRoots := make(map[string]string)
  979. var folders []string
  980. p.model.fmut.RLock()
  981. for folder, cfg := range p.model.folderCfgs {
  982. folderRoots[folder] = cfg.Path()
  983. folders = append(folders, folder)
  984. }
  985. p.model.fmut.RUnlock()
  986. for _, block := range state.blocks {
  987. if p.allowSparse && state.reused == 0 && block.IsEmpty() {
  988. // The block is a block of all zeroes, and we are not reusing
  989. // a temp file, so there is no need to do anything with it.
  990. // If we were reusing a temp file and had this block to copy,
  991. // it would be because the block in the temp file was *not* a
  992. // block of all zeroes, so then we should not skip it.
  993. // Pretend we copied it.
  994. state.copiedFromOrigin()
  995. continue
  996. }
  997. buf = buf[:int(block.Size)]
  998. found := p.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool {
  999. fd, err := os.Open(filepath.Join(folderRoots[folder], file))
  1000. if err != nil {
  1001. return false
  1002. }
  1003. _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
  1004. fd.Close()
  1005. if err != nil {
  1006. return false
  1007. }
  1008. hash, err := scanner.VerifyBuffer(buf, block)
  1009. if err != nil {
  1010. if hash != nil {
  1011. l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
  1012. err = p.model.finder.Fix(folder, file, index, block.Hash, hash)
  1013. if err != nil {
  1014. l.Warnln("finder fix:", err)
  1015. }
  1016. } else {
  1017. l.Debugln("Finder failed to verify buffer", err)
  1018. }
  1019. return false
  1020. }
  1021. _, err = dstFd.WriteAt(buf, block.Offset)
  1022. if err != nil {
  1023. state.fail("dst write", err)
  1024. }
  1025. if file == state.file.Name {
  1026. state.copiedFromOrigin()
  1027. }
  1028. return true
  1029. })
  1030. if state.failed() != nil {
  1031. break
  1032. }
  1033. if !found {
  1034. state.pullStarted()
  1035. ps := pullBlockState{
  1036. sharedPullerState: state.sharedPullerState,
  1037. block: block,
  1038. }
  1039. pullChan <- ps
  1040. } else {
  1041. state.copyDone()
  1042. }
  1043. }
  1044. out <- state.sharedPullerState
  1045. }
  1046. }
  1047. func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  1048. for state := range in {
  1049. if state.failed() != nil {
  1050. out <- state.sharedPullerState
  1051. continue
  1052. }
  1053. // Get an fd to the temporary file. Technically we don't need it until
  1054. // after fetching the block, but if we run into an error here there is
  1055. // no point in issuing the request to the network.
  1056. fd, err := state.tempFile()
  1057. if err != nil {
  1058. out <- state.sharedPullerState
  1059. continue
  1060. }
  1061. if p.allowSparse && state.reused == 0 && state.block.IsEmpty() {
  1062. // There is no need to request a block of all zeroes. Pretend we
  1063. // requested it and handled it correctly.
  1064. state.pullDone()
  1065. out <- state.sharedPullerState
  1066. continue
  1067. }
  1068. var lastError error
  1069. potentialDevices := p.model.Availability(p.folder, state.file.Name)
  1070. for {
  1071. // Select the least busy device to pull the block from. If we found no
  1072. // feasible device at all, fail the block (and in the long run, the
  1073. // file).
  1074. selected := activity.leastBusy(potentialDevices)
  1075. if selected == (protocol.DeviceID{}) {
  1076. if lastError != nil {
  1077. state.fail("pull", lastError)
  1078. } else {
  1079. state.fail("pull", errNoDevice)
  1080. }
  1081. break
  1082. }
  1083. potentialDevices = removeDevice(potentialDevices, selected)
  1084. // Fetch the block, while marking the selected device as in use so that
  1085. // leastBusy can select another device when someone else asks.
  1086. activity.using(selected)
  1087. buf, lastError := p.model.requestGlobal(selected, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, 0, nil)
  1088. activity.done(selected)
  1089. if lastError != nil {
  1090. l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
  1091. continue
  1092. }
  1093. // Verify that the received block matches the desired hash, if not
  1094. // try pulling it from another device.
  1095. _, lastError = scanner.VerifyBuffer(buf, state.block)
  1096. if lastError != nil {
  1097. l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
  1098. continue
  1099. }
  1100. // Save the block data we got from the cluster
  1101. _, err = fd.WriteAt(buf, state.block.Offset)
  1102. if err != nil {
  1103. state.fail("save", err)
  1104. } else {
  1105. state.pullDone()
  1106. }
  1107. break
  1108. }
  1109. out <- state.sharedPullerState
  1110. }
  1111. }
  1112. func (p *rwFolder) performFinish(state *sharedPullerState) error {
  1113. // Set the correct permission bits on the new file
  1114. if !p.ignorePermissions(state.file) {
  1115. if err := os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)); err != nil {
  1116. return err
  1117. }
  1118. }
  1119. // Set the correct timestamp on the new file
  1120. t := time.Unix(state.file.Modified, 0)
  1121. if err := os.Chtimes(state.tempName, t, t); err != nil {
  1122. // Try using virtual mtimes instead
  1123. info, err := os.Stat(state.tempName)
  1124. if err != nil {
  1125. return err
  1126. }
  1127. p.virtualMtimeRepo.UpdateMtime(state.file.Name, info.ModTime(), t)
  1128. }
  1129. if stat, err := osutil.Lstat(state.realName); err == nil {
  1130. // There is an old file or directory already in place. We need to
  1131. // handle that.
  1132. switch {
  1133. case stat.IsDir() || stat.Mode()&os.ModeSymlink != 0:
  1134. // It's a directory or a symlink. These are not versioned or
  1135. // archived for conflicts, only removed (which of course fails for
  1136. // non-empty directories).
  1137. // TODO: This is the place where we want to remove temporary files
  1138. // and future hard ignores before attempting a directory delete.
  1139. // Should share code with p.deletDir().
  1140. if err = osutil.InWritableDir(osutil.Remove, state.realName); err != nil {
  1141. return err
  1142. }
  1143. case p.inConflict(state.version, state.file.Version):
  1144. // The new file has been changed in conflict with the existing one. We
  1145. // should file it away as a conflict instead of just removing or
  1146. // archiving. Also merge with the version vector we had, to indicate
  1147. // we have resolved the conflict.
  1148. state.file.Version = state.file.Version.Merge(state.version)
  1149. if err = osutil.InWritableDir(p.moveForConflict, state.realName); err != nil {
  1150. return err
  1151. }
  1152. case p.versioner != nil:
  1153. // If we should use versioning, let the versioner archive the old
  1154. // file before we replace it. Archiving a non-existent file is not
  1155. // an error.
  1156. if err = p.versioner.Archive(state.realName); err != nil {
  1157. return err
  1158. }
  1159. }
  1160. }
  1161. // Replace the original content with the new one
  1162. if err := osutil.Rename(state.tempName, state.realName); err != nil {
  1163. return err
  1164. }
  1165. // If it's a symlink, the target of the symlink is inside the file.
  1166. if state.file.IsSymlink() {
  1167. content, err := ioutil.ReadFile(state.realName)
  1168. if err != nil {
  1169. return err
  1170. }
  1171. // Remove the file, and replace it with a symlink.
  1172. err = osutil.InWritableDir(func(path string) error {
  1173. os.Remove(path)
  1174. tt := symlinks.TargetFile
  1175. if state.file.IsDirectory() {
  1176. tt = symlinks.TargetDirectory
  1177. }
  1178. return symlinks.Create(path, string(content), tt)
  1179. }, state.realName)
  1180. if err != nil {
  1181. return err
  1182. }
  1183. }
  1184. // Record the updated file in the index
  1185. p.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile}
  1186. return nil
  1187. }
  1188. func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
  1189. for state := range in {
  1190. if closed, err := state.finalClose(); closed {
  1191. l.Debugln(p, "closing", state.file.Name)
  1192. p.queue.Done(state.file.Name)
  1193. if err == nil {
  1194. err = p.performFinish(state)
  1195. }
  1196. if err != nil {
  1197. l.Infoln("Puller: final:", err)
  1198. p.newError(state.file.Name, err)
  1199. }
  1200. events.Default.Log(events.ItemFinished, map[string]interface{}{
  1201. "folder": p.folder,
  1202. "item": state.file.Name,
  1203. "error": events.Error(err),
  1204. "type": "file",
  1205. "action": "update",
  1206. })
  1207. if p.progressEmitter != nil {
  1208. p.progressEmitter.Deregister(state)
  1209. }
  1210. }
  1211. }
  1212. }
  1213. // Moves the given filename to the front of the job queue
  1214. func (p *rwFolder) BringToFront(filename string) {
  1215. p.queue.BringToFront(filename)
  1216. }
  1217. func (p *rwFolder) Jobs() ([]string, []string) {
  1218. return p.queue.Jobs()
  1219. }
  1220. func (p *rwFolder) DelayScan(next time.Duration) {
  1221. p.delayScan <- next
  1222. }
  1223. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1224. // larger than 1000 items, and no more delayed than 2 seconds.
  1225. func (p *rwFolder) dbUpdaterRoutine() {
  1226. const (
  1227. maxBatchSize = 1000
  1228. maxBatchTime = 2 * time.Second
  1229. )
  1230. batch := make([]dbUpdateJob, 0, maxBatchSize)
  1231. files := make([]protocol.FileInfo, 0, maxBatchSize)
  1232. tick := time.NewTicker(maxBatchTime)
  1233. defer tick.Stop()
  1234. handleBatch := func() {
  1235. found := false
  1236. var lastFile protocol.FileInfo
  1237. for _, job := range batch {
  1238. files = append(files, job.file)
  1239. if job.file.IsInvalid() || (job.file.IsDirectory() && !job.file.IsSymlink()) {
  1240. continue
  1241. }
  1242. if job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) == 0 {
  1243. continue
  1244. }
  1245. found = true
  1246. lastFile = job.file
  1247. }
  1248. p.model.updateLocals(p.folder, files)
  1249. if found {
  1250. p.model.receivedFile(p.folder, lastFile)
  1251. }
  1252. batch = batch[:0]
  1253. files = files[:0]
  1254. }
  1255. loop:
  1256. for {
  1257. select {
  1258. case job, ok := <-p.dbUpdates:
  1259. if !ok {
  1260. break loop
  1261. }
  1262. job.file.LocalVersion = 0
  1263. batch = append(batch, job)
  1264. if len(batch) == maxBatchSize {
  1265. handleBatch()
  1266. }
  1267. case <-tick.C:
  1268. if len(batch) > 0 {
  1269. handleBatch()
  1270. }
  1271. }
  1272. }
  1273. if len(batch) > 0 {
  1274. handleBatch()
  1275. }
  1276. }
  1277. func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool {
  1278. if current.Concurrent(replacement) {
  1279. // Obvious case
  1280. return true
  1281. }
  1282. if replacement.Counter(p.shortID) > current.Counter(p.shortID) {
  1283. // The replacement file contains a higher version for ourselves than
  1284. // what we have. This isn't supposed to be possible, since it's only
  1285. // we who can increment that counter. We take it as a sign that
  1286. // something is wrong (our index may have been corrupted or removed)
  1287. // and flag it as a conflict.
  1288. return true
  1289. }
  1290. return false
  1291. }
  1292. func removeDevice(devices []protocol.DeviceID, device protocol.DeviceID) []protocol.DeviceID {
  1293. for i := range devices {
  1294. if devices[i] == device {
  1295. devices[i] = devices[len(devices)-1]
  1296. return devices[:len(devices)-1]
  1297. }
  1298. }
  1299. return devices
  1300. }
  1301. func (p *rwFolder) moveForConflict(name string) error {
  1302. if strings.Contains(filepath.Base(name), ".sync-conflict-") {
  1303. l.Infoln("Conflict for", name, "which is already a conflict copy; not copying again.")
  1304. if err := osutil.Remove(name); err != nil && !os.IsNotExist(err) {
  1305. return err
  1306. }
  1307. return nil
  1308. }
  1309. if p.maxConflicts == 0 {
  1310. if err := osutil.Remove(name); err != nil && !os.IsNotExist(err) {
  1311. return err
  1312. }
  1313. return nil
  1314. }
  1315. ext := filepath.Ext(name)
  1316. withoutExt := name[:len(name)-len(ext)]
  1317. newName := withoutExt + time.Now().Format(".sync-conflict-20060102-150405") + ext
  1318. err := os.Rename(name, newName)
  1319. if os.IsNotExist(err) {
  1320. // We were supposed to move a file away but it does not exist. Either
  1321. // the user has already moved it away, or the conflict was between a
  1322. // remote modification and a local delete. In either way it does not
  1323. // matter, go ahead as if the move succeeded.
  1324. err = nil
  1325. }
  1326. if p.maxConflicts > -1 {
  1327. matches, gerr := osutil.Glob(withoutExt + ".sync-conflict-????????-??????" + ext)
  1328. if gerr == nil && len(matches) > p.maxConflicts {
  1329. sort.Sort(sort.Reverse(sort.StringSlice(matches)))
  1330. for _, match := range matches[p.maxConflicts:] {
  1331. gerr = osutil.Remove(match)
  1332. if gerr != nil {
  1333. l.Debugln(p, "removing extra conflict", gerr)
  1334. }
  1335. }
  1336. } else if gerr != nil {
  1337. l.Debugln(p, "globbing for conflicts", gerr)
  1338. }
  1339. }
  1340. return err
  1341. }
  1342. func (p *rwFolder) newError(path string, err error) {
  1343. p.errorsMut.Lock()
  1344. defer p.errorsMut.Unlock()
  1345. // We might get more than one error report for a file (i.e. error on
  1346. // Write() followed by Close()); we keep the first error as that is
  1347. // probably closer to the root cause.
  1348. if _, ok := p.errors[path]; ok {
  1349. return
  1350. }
  1351. p.errors[path] = err.Error()
  1352. }
  1353. func (p *rwFolder) clearErrors() {
  1354. p.errorsMut.Lock()
  1355. p.errors = make(map[string]string)
  1356. p.errorsMut.Unlock()
  1357. }
  1358. func (p *rwFolder) currentErrors() []fileError {
  1359. p.errorsMut.Lock()
  1360. errors := make([]fileError, 0, len(p.errors))
  1361. for path, err := range p.errors {
  1362. errors = append(errors, fileError{path, err})
  1363. }
  1364. sort.Sort(fileErrorList(errors))
  1365. p.errorsMut.Unlock()
  1366. return errors
  1367. }
  1368. // A []fileError is sent as part of an event and will be JSON serialized.
  1369. type fileError struct {
  1370. Path string `json:"path"`
  1371. Err string `json:"error"`
  1372. }
  1373. type fileErrorList []fileError
  1374. func (l fileErrorList) Len() int {
  1375. return len(l)
  1376. }
  1377. func (l fileErrorList) Less(a, b int) bool {
  1378. return l[a].Path < l[b].Path
  1379. }
  1380. func (l fileErrorList) Swap(a, b int) {
  1381. l[a], l[b] = l[b], l[a]
  1382. }