rwfolder.go 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "errors"
  9. "fmt"
  10. "io/ioutil"
  11. "math/rand"
  12. "os"
  13. "path/filepath"
  14. "runtime"
  15. "sort"
  16. "strings"
  17. "time"
  18. "github.com/syncthing/syncthing/lib/config"
  19. "github.com/syncthing/syncthing/lib/db"
  20. "github.com/syncthing/syncthing/lib/events"
  21. "github.com/syncthing/syncthing/lib/fs"
  22. "github.com/syncthing/syncthing/lib/ignore"
  23. "github.com/syncthing/syncthing/lib/osutil"
  24. "github.com/syncthing/syncthing/lib/protocol"
  25. "github.com/syncthing/syncthing/lib/scanner"
  26. "github.com/syncthing/syncthing/lib/symlinks"
  27. "github.com/syncthing/syncthing/lib/sync"
  28. "github.com/syncthing/syncthing/lib/versioner"
  29. )
  30. func init() {
  31. folderFactories[config.FolderTypeReadWrite] = newRWFolder
  32. }
  33. // A pullBlockState is passed to the puller routine for each block that needs
  34. // to be fetched.
  35. type pullBlockState struct {
  36. *sharedPullerState
  37. block protocol.BlockInfo
  38. }
  39. // A copyBlocksState is passed to copy routine if the file has blocks to be
  40. // copied.
  41. type copyBlocksState struct {
  42. *sharedPullerState
  43. blocks []protocol.BlockInfo
  44. }
  45. // Which filemode bits to preserve
  46. const retainBits = os.ModeSetgid | os.ModeSetuid | os.ModeSticky
  47. var (
  48. activity = newDeviceActivity()
  49. errNoDevice = errors.New("peers who had this file went away, or the file has changed while syncing. will retry later")
  50. )
  51. const (
  52. dbUpdateHandleDir = iota
  53. dbUpdateDeleteDir
  54. dbUpdateHandleFile
  55. dbUpdateDeleteFile
  56. dbUpdateShortcutFile
  57. )
  58. const (
  59. defaultCopiers = 1
  60. defaultPullers = 16
  61. defaultPullerSleep = 10 * time.Second
  62. defaultPullerPause = 60 * time.Second
  63. )
  64. type dbUpdateJob struct {
  65. file protocol.FileInfo
  66. jobType int
  67. }
  68. type rwFolder struct {
  69. folder
  70. mtimeFS *fs.MtimeFS
  71. dir string
  72. versioner versioner.Versioner
  73. ignorePerms bool
  74. order config.PullOrder
  75. maxConflicts int
  76. sleep time.Duration
  77. pause time.Duration
  78. allowSparse bool
  79. checkFreeSpace bool
  80. ignoreDelete bool
  81. fsync bool
  82. copiers int
  83. pullers int
  84. queue *jobQueue
  85. dbUpdates chan dbUpdateJob
  86. pullTimer *time.Timer
  87. remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
  88. errors map[string]string // path -> error string
  89. errorsMut sync.Mutex
  90. initialScanCompleted chan (struct{}) // exposed for testing
  91. }
  92. func newRWFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner, mtimeFS *fs.MtimeFS) service {
  93. f := &rwFolder{
  94. folder: folder{
  95. stateTracker: newStateTracker(cfg.ID),
  96. scan: newFolderScanner(cfg),
  97. stop: make(chan struct{}),
  98. model: model,
  99. },
  100. mtimeFS: mtimeFS,
  101. dir: cfg.Path(),
  102. versioner: ver,
  103. ignorePerms: cfg.IgnorePerms,
  104. copiers: cfg.Copiers,
  105. pullers: cfg.Pullers,
  106. order: cfg.Order,
  107. maxConflicts: cfg.MaxConflicts,
  108. allowSparse: !cfg.DisableSparseFiles,
  109. checkFreeSpace: cfg.MinDiskFreePct != 0,
  110. ignoreDelete: cfg.IgnoreDelete,
  111. fsync: cfg.Fsync,
  112. queue: newJobQueue(),
  113. pullTimer: time.NewTimer(time.Second),
  114. remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
  115. errorsMut: sync.NewMutex(),
  116. initialScanCompleted: make(chan struct{}),
  117. }
  118. f.configureCopiersAndPullers(cfg)
  119. return f
  120. }
  121. func (f *rwFolder) configureCopiersAndPullers(config config.FolderConfiguration) {
  122. if f.copiers == 0 {
  123. f.copiers = defaultCopiers
  124. }
  125. if f.pullers == 0 {
  126. f.pullers = defaultPullers
  127. }
  128. if config.PullerPauseS == 0 {
  129. f.pause = defaultPullerPause
  130. } else {
  131. f.pause = time.Duration(config.PullerPauseS) * time.Second
  132. }
  133. if config.PullerSleepS == 0 {
  134. f.sleep = defaultPullerSleep
  135. } else {
  136. f.sleep = time.Duration(config.PullerSleepS) * time.Second
  137. }
  138. }
  139. // Helper function to check whether either the ignorePerm flag has been
  140. // set on the local host or the FlagNoPermBits has been set on the file/dir
  141. // which is being pulled.
  142. func (f *rwFolder) ignorePermissions(file protocol.FileInfo) bool {
  143. return f.ignorePerms || file.NoPermissions
  144. }
  145. // Serve will run scans and pulls. It will return when Stop()ed or on a
  146. // critical error.
  147. func (f *rwFolder) Serve() {
  148. l.Debugln(f, "starting")
  149. defer l.Debugln(f, "exiting")
  150. defer func() {
  151. f.pullTimer.Stop()
  152. f.scan.timer.Stop()
  153. // TODO: Should there be an actual FolderStopped state?
  154. f.setState(FolderIdle)
  155. }()
  156. var prevSec int64
  157. var prevIgnoreHash string
  158. for {
  159. select {
  160. case <-f.stop:
  161. return
  162. case <-f.remoteIndex:
  163. prevSec = 0
  164. f.pullTimer.Reset(0)
  165. l.Debugln(f, "remote index updated, rescheduling pull")
  166. case <-f.pullTimer.C:
  167. select {
  168. case <-f.initialScanCompleted:
  169. default:
  170. // We don't start pulling files until a scan has been completed.
  171. l.Debugln(f, "skip (initial)")
  172. f.pullTimer.Reset(f.sleep)
  173. continue
  174. }
  175. f.model.fmut.RLock()
  176. curIgnores := f.model.folderIgnores[f.folderID]
  177. f.model.fmut.RUnlock()
  178. if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
  179. // The ignore patterns have changed. We need to re-evaluate if
  180. // there are files we need now that were ignored before.
  181. l.Debugln(f, "ignore patterns have changed, resetting prevVer")
  182. prevSec = 0
  183. prevIgnoreHash = newHash
  184. }
  185. // RemoteSequence() is a fast call, doesn't touch the database.
  186. curSeq, ok := f.model.RemoteSequence(f.folderID)
  187. if !ok || curSeq == prevSec {
  188. l.Debugln(f, "skip (curSeq == prevSeq)", prevSec, ok)
  189. f.pullTimer.Reset(f.sleep)
  190. continue
  191. }
  192. if err := f.model.CheckFolderHealth(f.folderID); err != nil {
  193. l.Infoln("Skipping folder", f.folderID, "pull due to folder error:", err)
  194. f.pullTimer.Reset(f.sleep)
  195. continue
  196. }
  197. l.Debugln(f, "pulling", prevSec, curSeq)
  198. f.setState(FolderSyncing)
  199. f.clearErrors()
  200. tries := 0
  201. for {
  202. tries++
  203. changed := f.pullerIteration(curIgnores)
  204. l.Debugln(f, "changed", changed)
  205. if changed == 0 {
  206. // No files were changed by the puller, so we are in
  207. // sync. Remember the local version number and
  208. // schedule a resync a little bit into the future.
  209. if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < curSeq {
  210. // There's a corner case where the device we needed
  211. // files from disconnected during the puller
  212. // iteration. The files will have been removed from
  213. // the index, so we've concluded that we don't need
  214. // them, but at the same time we have the local
  215. // version that includes those files in curVer. So we
  216. // catch the case that sequence might have
  217. // decreased here.
  218. l.Debugln(f, "adjusting curVer", lv)
  219. curSeq = lv
  220. }
  221. prevSec = curSeq
  222. l.Debugln(f, "next pull in", f.sleep)
  223. f.pullTimer.Reset(f.sleep)
  224. break
  225. }
  226. if tries > 10 {
  227. // We've tried a bunch of times to get in sync, but
  228. // we're not making it. Probably there are write
  229. // errors preventing us. Flag this with a warning and
  230. // wait a bit longer before retrying.
  231. l.Infof("Folder %q isn't making progress. Pausing puller for %v.", f.folderID, f.pause)
  232. l.Debugln(f, "next pull in", f.pause)
  233. if folderErrors := f.currentErrors(); len(folderErrors) > 0 {
  234. events.Default.Log(events.FolderErrors, map[string]interface{}{
  235. "folder": f.folderID,
  236. "errors": folderErrors,
  237. })
  238. }
  239. f.pullTimer.Reset(f.pause)
  240. break
  241. }
  242. }
  243. f.setState(FolderIdle)
  244. // The reason for running the scanner from within the puller is that
  245. // this is the easiest way to make sure we are not doing both at the
  246. // same time.
  247. case <-f.scan.timer.C:
  248. err := f.scanSubdirsIfHealthy(nil)
  249. f.scan.Reschedule()
  250. if err != nil {
  251. continue
  252. }
  253. select {
  254. case <-f.initialScanCompleted:
  255. default:
  256. l.Infoln("Completed initial scan (rw) of folder", f.folderID)
  257. close(f.initialScanCompleted)
  258. }
  259. case req := <-f.scan.now:
  260. req.err <- f.scanSubdirsIfHealthy(req.subdirs)
  261. case next := <-f.scan.delay:
  262. f.scan.timer.Reset(next)
  263. }
  264. }
  265. }
  266. func (f *rwFolder) IndexUpdated() {
  267. select {
  268. case f.remoteIndex <- struct{}{}:
  269. default:
  270. // We might be busy doing a pull and thus not reading from this
  271. // channel. The channel is 1-buffered, so one notification will be
  272. // queued to ensure we recheck after the pull, but beyond that we must
  273. // make sure to not block index receiving.
  274. }
  275. }
  276. func (f *rwFolder) String() string {
  277. return fmt.Sprintf("rwFolder/%s@%p", f.folderID, f)
  278. }
  279. // pullerIteration runs a single puller iteration for the given folder and
  280. // returns the number items that should have been synced (even those that
  281. // might have failed). One puller iteration handles all files currently
  282. // flagged as needed in the folder.
  283. func (f *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
  284. pullChan := make(chan pullBlockState)
  285. copyChan := make(chan copyBlocksState)
  286. finisherChan := make(chan *sharedPullerState)
  287. updateWg := sync.NewWaitGroup()
  288. copyWg := sync.NewWaitGroup()
  289. pullWg := sync.NewWaitGroup()
  290. doneWg := sync.NewWaitGroup()
  291. l.Debugln(f, "c", f.copiers, "p", f.pullers)
  292. f.dbUpdates = make(chan dbUpdateJob)
  293. updateWg.Add(1)
  294. go func() {
  295. // dbUpdaterRoutine finishes when f.dbUpdates is closed
  296. f.dbUpdaterRoutine()
  297. updateWg.Done()
  298. }()
  299. for i := 0; i < f.copiers; i++ {
  300. copyWg.Add(1)
  301. go func() {
  302. // copierRoutine finishes when copyChan is closed
  303. f.copierRoutine(copyChan, pullChan, finisherChan)
  304. copyWg.Done()
  305. }()
  306. }
  307. for i := 0; i < f.pullers; i++ {
  308. pullWg.Add(1)
  309. go func() {
  310. // pullerRoutine finishes when pullChan is closed
  311. f.pullerRoutine(pullChan, finisherChan)
  312. pullWg.Done()
  313. }()
  314. }
  315. doneWg.Add(1)
  316. // finisherRoutine finishes when finisherChan is closed
  317. go func() {
  318. f.finisherRoutine(finisherChan)
  319. doneWg.Done()
  320. }()
  321. f.model.fmut.RLock()
  322. folderFiles := f.model.folderFiles[f.folderID]
  323. f.model.fmut.RUnlock()
  324. // !!!
  325. // WithNeed takes a database snapshot (by necessity). By the time we've
  326. // handled a bunch of files it might have become out of date and we might
  327. // be attempting to sync with an old version of a file...
  328. // !!!
  329. changed := 0
  330. fileDeletions := map[string]protocol.FileInfo{}
  331. dirDeletions := []protocol.FileInfo{}
  332. buckets := map[string][]protocol.FileInfo{}
  333. handleFile := func(fi protocol.FileInfo) bool {
  334. switch {
  335. case fi.IsDeleted():
  336. // A deleted file, directory or symlink
  337. if fi.IsDirectory() {
  338. dirDeletions = append(dirDeletions, fi)
  339. } else {
  340. fileDeletions[fi.Name] = fi
  341. df, ok := f.model.CurrentFolderFile(f.folderID, fi.Name)
  342. // Local file can be already deleted, but with a lower version
  343. // number, hence the deletion coming in again as part of
  344. // WithNeed, furthermore, the file can simply be of the wrong
  345. // type if we haven't yet managed to pull it.
  346. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
  347. // Put files into buckets per first hash
  348. key := string(df.Blocks[0].Hash)
  349. buckets[key] = append(buckets[key], df)
  350. }
  351. }
  352. case fi.IsDirectory() && !fi.IsSymlink():
  353. // A new or changed directory
  354. l.Debugln("Creating directory", fi.Name)
  355. f.handleDir(fi)
  356. default:
  357. return false
  358. }
  359. return true
  360. }
  361. folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
  362. // Needed items are delivered sorted lexicographically. We'll handle
  363. // directories as they come along, so parents before children. Files
  364. // are queued and the order may be changed later.
  365. if shouldIgnore(intf, ignores, f.ignoreDelete) {
  366. return true
  367. }
  368. if err := fileValid(intf); err != nil {
  369. // The file isn't valid so we can't process it. Pretend that we
  370. // tried and set the error for the file.
  371. f.newError(intf.FileName(), err)
  372. changed++
  373. return true
  374. }
  375. file := intf.(protocol.FileInfo)
  376. l.Debugln(f, "handling", file.Name)
  377. if !handleFile(file) {
  378. // A new or changed file or symlink. This is the only case where
  379. // we do stuff concurrently in the background. We only queue
  380. // files where we are connected to at least one device that has
  381. // the file.
  382. devices := folderFiles.Availability(file.Name)
  383. for _, dev := range devices {
  384. if f.model.ConnectedTo(dev) {
  385. f.queue.Push(file.Name, file.Size, file.ModTime())
  386. changed++
  387. break
  388. }
  389. }
  390. }
  391. return true
  392. })
  393. // Reorder the file queue according to configuration
  394. switch f.order {
  395. case config.OrderRandom:
  396. f.queue.Shuffle()
  397. case config.OrderAlphabetic:
  398. // The queue is already in alphabetic order.
  399. case config.OrderSmallestFirst:
  400. f.queue.SortSmallestFirst()
  401. case config.OrderLargestFirst:
  402. f.queue.SortLargestFirst()
  403. case config.OrderOldestFirst:
  404. f.queue.SortOldestFirst()
  405. case config.OrderNewestFirst:
  406. f.queue.SortNewestFirst()
  407. }
  408. // Process the file queue
  409. nextFile:
  410. for {
  411. select {
  412. case <-f.stop:
  413. // Stop processing files if the puller has been told to stop.
  414. break
  415. default:
  416. }
  417. fileName, ok := f.queue.Pop()
  418. if !ok {
  419. break
  420. }
  421. fi, ok := f.model.CurrentGlobalFile(f.folderID, fileName)
  422. if !ok {
  423. // File is no longer in the index. Mark it as done and drop it.
  424. f.queue.Done(fileName)
  425. continue
  426. }
  427. // Handles races where an index update arrives changing what the file
  428. // is between queueing and retrieving it from the queue, effectively
  429. // changing how the file should be handled.
  430. if handleFile(fi) {
  431. continue
  432. }
  433. if !fi.IsSymlink() {
  434. key := string(fi.Blocks[0].Hash)
  435. for i, candidate := range buckets[key] {
  436. if scanner.BlocksEqual(candidate.Blocks, fi.Blocks) {
  437. // Remove the candidate from the bucket
  438. lidx := len(buckets[key]) - 1
  439. buckets[key][i] = buckets[key][lidx]
  440. buckets[key] = buckets[key][:lidx]
  441. // candidate is our current state of the file, where as the
  442. // desired state with the delete bit set is in the deletion
  443. // map.
  444. desired := fileDeletions[candidate.Name]
  445. // Remove the pending deletion (as we perform it by renaming)
  446. delete(fileDeletions, candidate.Name)
  447. f.renameFile(desired, fi)
  448. f.queue.Done(fileName)
  449. continue nextFile
  450. }
  451. }
  452. }
  453. // Not a rename or a symlink, deal with it.
  454. f.handleFile(fi, copyChan, finisherChan)
  455. }
  456. // Signal copy and puller routines that we are done with the in data for
  457. // this iteration. Wait for them to finish.
  458. close(copyChan)
  459. copyWg.Wait()
  460. close(pullChan)
  461. pullWg.Wait()
  462. // Signal the finisher chan that there will be no more input.
  463. close(finisherChan)
  464. // Wait for the finisherChan to finish.
  465. doneWg.Wait()
  466. for _, file := range fileDeletions {
  467. l.Debugln("Deleting file", file.Name)
  468. f.deleteFile(file)
  469. }
  470. for i := range dirDeletions {
  471. dir := dirDeletions[len(dirDeletions)-i-1]
  472. l.Debugln("Deleting dir", dir.Name)
  473. f.deleteDir(dir, ignores)
  474. }
  475. // Wait for db updates to complete
  476. close(f.dbUpdates)
  477. updateWg.Wait()
  478. return changed
  479. }
  480. // handleDir creates or updates the given directory
  481. func (f *rwFolder) handleDir(file protocol.FileInfo) {
  482. var err error
  483. events.Default.Log(events.ItemStarted, map[string]string{
  484. "folder": f.folderID,
  485. "item": file.Name,
  486. "type": "dir",
  487. "action": "update",
  488. })
  489. defer func() {
  490. events.Default.Log(events.ItemFinished, map[string]interface{}{
  491. "folder": f.folderID,
  492. "item": file.Name,
  493. "error": events.Error(err),
  494. "type": "dir",
  495. "action": "update",
  496. })
  497. }()
  498. realName := filepath.Join(f.dir, file.Name)
  499. mode := os.FileMode(file.Permissions & 0777)
  500. if f.ignorePermissions(file) {
  501. mode = 0777
  502. }
  503. if shouldDebug() {
  504. curFile, _ := f.model.CurrentFolderFile(f.folderID, file.Name)
  505. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  506. }
  507. info, err := f.mtimeFS.Lstat(realName)
  508. switch {
  509. // There is already something under that name, but it's a file/link.
  510. // Most likely a file/link is getting replaced with a directory.
  511. // Remove the file/link and fall through to directory creation.
  512. case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
  513. err = osutil.InWritableDir(os.Remove, realName)
  514. if err != nil {
  515. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  516. f.newError(file.Name, err)
  517. return
  518. }
  519. fallthrough
  520. // The directory doesn't exist, so we create it with the right
  521. // mode bits from the start.
  522. case err != nil && os.IsNotExist(err):
  523. // We declare a function that acts on only the path name, so
  524. // we can pass it to InWritableDir. We use a regular Mkdir and
  525. // not MkdirAll because the parent should already exist.
  526. mkdir := func(path string) error {
  527. err = os.Mkdir(path, mode)
  528. if err != nil || f.ignorePermissions(file) {
  529. return err
  530. }
  531. // Stat the directory so we can check its permissions.
  532. info, err := f.mtimeFS.Lstat(path)
  533. if err != nil {
  534. return err
  535. }
  536. // Mask for the bits we want to preserve and add them in to the
  537. // directories permissions.
  538. return os.Chmod(path, mode|(info.Mode()&retainBits))
  539. }
  540. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  541. f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  542. } else {
  543. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  544. f.newError(file.Name, err)
  545. }
  546. return
  547. // Weird error when stat()'ing the dir. Probably won't work to do
  548. // anything else with it if we can't even stat() it.
  549. case err != nil:
  550. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  551. f.newError(file.Name, err)
  552. return
  553. }
  554. // The directory already exists, so we just correct the mode bits. (We
  555. // don't handle modification times on directories, because that sucks...)
  556. // It's OK to change mode bits on stuff within non-writable directories.
  557. if f.ignorePermissions(file) {
  558. f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  559. } else if err := os.Chmod(realName, mode|(info.Mode()&retainBits)); err == nil {
  560. f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  561. } else {
  562. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  563. f.newError(file.Name, err)
  564. }
  565. }
  566. // deleteDir attempts to delete the given directory
  567. func (f *rwFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Matcher) {
  568. var err error
  569. events.Default.Log(events.ItemStarted, map[string]string{
  570. "folder": f.folderID,
  571. "item": file.Name,
  572. "type": "dir",
  573. "action": "delete",
  574. })
  575. defer func() {
  576. events.Default.Log(events.ItemFinished, map[string]interface{}{
  577. "folder": f.folderID,
  578. "item": file.Name,
  579. "error": events.Error(err),
  580. "type": "dir",
  581. "action": "delete",
  582. })
  583. }()
  584. realName := filepath.Join(f.dir, file.Name)
  585. // Delete any temporary files lying around in the directory
  586. dir, _ := os.Open(realName)
  587. if dir != nil {
  588. files, _ := dir.Readdirnames(-1)
  589. for _, dirFile := range files {
  590. fullDirFile := filepath.Join(file.Name, dirFile)
  591. if defTempNamer.IsTemporary(dirFile) || (matcher != nil && matcher.Match(fullDirFile).IsDeletable()) {
  592. os.RemoveAll(filepath.Join(f.dir, fullDirFile))
  593. }
  594. }
  595. dir.Close()
  596. }
  597. err = osutil.InWritableDir(os.Remove, realName)
  598. if err == nil || os.IsNotExist(err) {
  599. // It was removed or it doesn't exist to start with
  600. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  601. } else if _, serr := f.mtimeFS.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  602. // We get an error just looking at the directory, and it's not a
  603. // permission problem. Lets assume the error is in fact some variant
  604. // of "file does not exist" (possibly expressed as some parent being a
  605. // file and not a directory etc) and that the delete is handled.
  606. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  607. } else {
  608. l.Infof("Puller (folder %q, dir %q): delete: %v", f.folderID, file.Name, err)
  609. f.newError(file.Name, err)
  610. }
  611. }
  612. // deleteFile attempts to delete the given file
  613. func (f *rwFolder) deleteFile(file protocol.FileInfo) {
  614. var err error
  615. events.Default.Log(events.ItemStarted, map[string]string{
  616. "folder": f.folderID,
  617. "item": file.Name,
  618. "type": "file",
  619. "action": "delete",
  620. })
  621. defer func() {
  622. events.Default.Log(events.ItemFinished, map[string]interface{}{
  623. "folder": f.folderID,
  624. "item": file.Name,
  625. "error": events.Error(err),
  626. "type": "file",
  627. "action": "delete",
  628. })
  629. }()
  630. realName := filepath.Join(f.dir, file.Name)
  631. cur, ok := f.model.CurrentFolderFile(f.folderID, file.Name)
  632. if ok && f.inConflict(cur.Version, file.Version) {
  633. // There is a conflict here. Move the file to a conflict copy instead
  634. // of deleting. Also merge with the version vector we had, to indicate
  635. // we have resolved the conflict.
  636. file.Version = file.Version.Merge(cur.Version)
  637. err = osutil.InWritableDir(f.moveForConflict, realName)
  638. } else if f.versioner != nil {
  639. err = osutil.InWritableDir(f.versioner.Archive, realName)
  640. } else {
  641. err = osutil.InWritableDir(os.Remove, realName)
  642. }
  643. if err == nil || os.IsNotExist(err) {
  644. // It was removed or it doesn't exist to start with
  645. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  646. } else if _, serr := f.mtimeFS.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  647. // We get an error just looking at the file, and it's not a permission
  648. // problem. Lets assume the error is in fact some variant of "file
  649. // does not exist" (possibly expressed as some parent being a file and
  650. // not a directory etc) and that the delete is handled.
  651. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  652. } else {
  653. l.Infof("Puller (folder %q, file %q): delete: %v", f.folderID, file.Name, err)
  654. f.newError(file.Name, err)
  655. }
  656. }
  657. // renameFile attempts to rename an existing file to a destination
  658. // and set the right attributes on it.
  659. func (f *rwFolder) renameFile(source, target protocol.FileInfo) {
  660. var err error
  661. events.Default.Log(events.ItemStarted, map[string]string{
  662. "folder": f.folderID,
  663. "item": source.Name,
  664. "type": "file",
  665. "action": "delete",
  666. })
  667. events.Default.Log(events.ItemStarted, map[string]string{
  668. "folder": f.folderID,
  669. "item": target.Name,
  670. "type": "file",
  671. "action": "update",
  672. })
  673. defer func() {
  674. events.Default.Log(events.ItemFinished, map[string]interface{}{
  675. "folder": f.folderID,
  676. "item": source.Name,
  677. "error": events.Error(err),
  678. "type": "file",
  679. "action": "delete",
  680. })
  681. events.Default.Log(events.ItemFinished, map[string]interface{}{
  682. "folder": f.folderID,
  683. "item": target.Name,
  684. "error": events.Error(err),
  685. "type": "file",
  686. "action": "update",
  687. })
  688. }()
  689. l.Debugln(f, "taking rename shortcut", source.Name, "->", target.Name)
  690. from := filepath.Join(f.dir, source.Name)
  691. to := filepath.Join(f.dir, target.Name)
  692. if f.versioner != nil {
  693. err = osutil.Copy(from, to)
  694. if err == nil {
  695. err = osutil.InWritableDir(f.versioner.Archive, from)
  696. }
  697. } else {
  698. err = osutil.TryRename(from, to)
  699. }
  700. if err == nil {
  701. // The file was renamed, so we have handled both the necessary delete
  702. // of the source and the creation of the target. Fix-up the metadata,
  703. // and update the local index of the target file.
  704. f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  705. err = f.shortcutFile(target)
  706. if err != nil {
  707. l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", f.folderID, target.Name, source.Name, err)
  708. f.newError(target.Name, err)
  709. return
  710. }
  711. f.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile}
  712. } else {
  713. // We failed the rename so we have a source file that we still need to
  714. // get rid of. Attempt to delete it instead so that we make *some*
  715. // progress. The target is unhandled.
  716. err = osutil.InWritableDir(os.Remove, from)
  717. if err != nil {
  718. l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", f.folderID, target.Name, source.Name, err)
  719. f.newError(target.Name, err)
  720. return
  721. }
  722. f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  723. }
  724. }
  725. // This is the flow of data and events here, I think...
  726. //
  727. // +-----------------------+
  728. // | | - - - - > ItemStarted
  729. // | handleFile | - - - - > ItemFinished (on shortcuts)
  730. // | |
  731. // +-----------------------+
  732. // |
  733. // | copyChan (copyBlocksState; unless shortcut taken)
  734. // |
  735. // | +-----------------------+
  736. // | | +-----------------------+
  737. // +--->| | |
  738. // | | copierRoutine |
  739. // +-| |
  740. // +-----------------------+
  741. // |
  742. // | pullChan (sharedPullerState)
  743. // |
  744. // | +-----------------------+
  745. // | | +-----------------------+
  746. // +-->| | |
  747. // | | pullerRoutine |
  748. // +-| |
  749. // +-----------------------+
  750. // |
  751. // | finisherChan (sharedPullerState)
  752. // |
  753. // | +-----------------------+
  754. // | | |
  755. // +-->| finisherRoutine | - - - - > ItemFinished
  756. // | |
  757. // +-----------------------+
  758. // handleFile queues the copies and pulls as necessary for a single new or
  759. // changed file.
  760. func (f *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
  761. curFile, hasCurFile := f.model.CurrentFolderFile(f.folderID, file.Name)
  762. if hasCurFile && len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) {
  763. // We are supposed to copy the entire file, and then fetch nothing. We
  764. // are only updating metadata, so we don't actually *need* to make the
  765. // copy.
  766. l.Debugln(f, "taking shortcut on", file.Name)
  767. events.Default.Log(events.ItemStarted, map[string]string{
  768. "folder": f.folderID,
  769. "item": file.Name,
  770. "type": "file",
  771. "action": "metadata",
  772. })
  773. f.queue.Done(file.Name)
  774. var err error
  775. if file.IsSymlink() {
  776. err = f.shortcutSymlink(file)
  777. } else {
  778. err = f.shortcutFile(file)
  779. }
  780. events.Default.Log(events.ItemFinished, map[string]interface{}{
  781. "folder": f.folderID,
  782. "item": file.Name,
  783. "error": events.Error(err),
  784. "type": "file",
  785. "action": "metadata",
  786. })
  787. if err != nil {
  788. l.Infoln("Puller: shortcut:", err)
  789. f.newError(file.Name, err)
  790. } else {
  791. f.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile}
  792. }
  793. return
  794. }
  795. // Figure out the absolute filenames we need once and for all
  796. tempName := filepath.Join(f.dir, defTempNamer.TempName(file.Name))
  797. realName := filepath.Join(f.dir, file.Name)
  798. if hasCurFile && !curFile.IsDirectory() && !curFile.IsSymlink() {
  799. // Check that the file on disk is what we expect it to be according to
  800. // the database. If there's a mismatch here, there might be local
  801. // changes that we don't know about yet and we should scan before
  802. // touching the file. If we can't stat the file we'll just pull it.
  803. if info, err := f.mtimeFS.Lstat(realName); err == nil {
  804. if !info.ModTime().Equal(curFile.ModTime()) || info.Size() != curFile.Size {
  805. l.Debugln("file modified but not rescanned; not pulling:", realName)
  806. // Scan() is synchronous (i.e. blocks until the scan is
  807. // completed and returns an error), but a scan can't happen
  808. // while we're in the puller routine. Request the scan in the
  809. // background and it'll be handled when the current pulling
  810. // sweep is complete. As we do retries, we'll queue the scan
  811. // for this file up to ten times, but the last nine of those
  812. // scans will be cheap...
  813. go f.scan.Scan([]string{file.Name})
  814. return
  815. }
  816. }
  817. }
  818. scanner.PopulateOffsets(file.Blocks)
  819. var blocks []protocol.BlockInfo
  820. var blocksSize int64
  821. var reused []int32
  822. // Check for an old temporary file which might have some blocks we could
  823. // reuse.
  824. tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize, nil)
  825. if err == nil {
  826. // Check for any reusable blocks in the temp file
  827. tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
  828. // block.String() returns a string unique to the block
  829. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  830. for _, block := range tempCopyBlocks {
  831. existingBlocks[block.String()] = struct{}{}
  832. }
  833. // Since the blocks are already there, we don't need to get them.
  834. for i, block := range file.Blocks {
  835. _, ok := existingBlocks[block.String()]
  836. if !ok {
  837. blocks = append(blocks, block)
  838. blocksSize += int64(block.Size)
  839. } else {
  840. reused = append(reused, int32(i))
  841. }
  842. }
  843. // The sharedpullerstate will know which flags to use when opening the
  844. // temp file depending if we are reusing any blocks or not.
  845. if len(reused) == 0 {
  846. // Otherwise, discard the file ourselves in order for the
  847. // sharedpuller not to panic when it fails to exclusively create a
  848. // file which already exists
  849. osutil.InWritableDir(os.Remove, tempName)
  850. }
  851. } else {
  852. // Copy the blocks, as we don't want to shuffle them on the FileInfo
  853. blocks = append(blocks, file.Blocks...)
  854. blocksSize = file.Size
  855. }
  856. if f.checkFreeSpace {
  857. if free, err := osutil.DiskFreeBytes(f.dir); err == nil && free < blocksSize {
  858. l.Warnf(`Folder "%s": insufficient disk space in %s for %s: have %.2f MiB, need %.2f MiB`, f.folderID, f.dir, file.Name, float64(free)/1024/1024, float64(blocksSize)/1024/1024)
  859. f.newError(file.Name, errors.New("insufficient space"))
  860. return
  861. }
  862. }
  863. // Shuffle the blocks
  864. for i := range blocks {
  865. j := rand.Intn(i + 1)
  866. blocks[i], blocks[j] = blocks[j], blocks[i]
  867. }
  868. events.Default.Log(events.ItemStarted, map[string]string{
  869. "folder": f.folderID,
  870. "item": file.Name,
  871. "type": "file",
  872. "action": "update",
  873. })
  874. s := sharedPullerState{
  875. file: file,
  876. folder: f.folderID,
  877. tempName: tempName,
  878. realName: realName,
  879. copyTotal: len(blocks),
  880. copyNeeded: len(blocks),
  881. reused: len(reused),
  882. updated: time.Now(),
  883. available: reused,
  884. availableUpdated: time.Now(),
  885. ignorePerms: f.ignorePermissions(file),
  886. version: curFile.Version,
  887. mut: sync.NewRWMutex(),
  888. sparse: f.allowSparse,
  889. created: time.Now(),
  890. }
  891. l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), reused)
  892. cs := copyBlocksState{
  893. sharedPullerState: &s,
  894. blocks: blocks,
  895. }
  896. copyChan <- cs
  897. }
  898. // shortcutFile sets file mode and modification time, when that's the only
  899. // thing that has changed.
  900. func (f *rwFolder) shortcutFile(file protocol.FileInfo) error {
  901. realName := filepath.Join(f.dir, file.Name)
  902. if !f.ignorePermissions(file) {
  903. if err := os.Chmod(realName, os.FileMode(file.Permissions&0777)); err != nil {
  904. l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", f.folderID, file.Name, err)
  905. f.newError(file.Name, err)
  906. return err
  907. }
  908. }
  909. f.mtimeFS.Chtimes(realName, file.ModTime(), file.ModTime()) // never fails
  910. // This may have been a conflict. We should merge the version vectors so
  911. // that our clock doesn't move backwards.
  912. if cur, ok := f.model.CurrentFolderFile(f.folderID, file.Name); ok {
  913. file.Version = file.Version.Merge(cur.Version)
  914. }
  915. return nil
  916. }
  917. // shortcutSymlink changes the symlinks type if necessary.
  918. func (f *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
  919. tt := symlinks.TargetFile
  920. if file.IsDirectory() {
  921. tt = symlinks.TargetDirectory
  922. }
  923. err = symlinks.ChangeType(filepath.Join(f.dir, file.Name), tt)
  924. if err != nil {
  925. l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", f.folderID, file.Name, err)
  926. f.newError(file.Name, err)
  927. }
  928. return
  929. }
  930. // copierRoutine reads copierStates until the in channel closes and performs
  931. // the relevant copies when possible, or passes it to the puller routine.
  932. func (f *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  933. buf := make([]byte, protocol.BlockSize)
  934. for state := range in {
  935. dstFd, err := state.tempFile()
  936. if err != nil {
  937. // Nothing more to do for this failed file, since we couldn't create a temporary for it.
  938. out <- state.sharedPullerState
  939. continue
  940. }
  941. if f.model.progressEmitter != nil {
  942. f.model.progressEmitter.Register(state.sharedPullerState)
  943. }
  944. folderRoots := make(map[string]string)
  945. var folders []string
  946. f.model.fmut.RLock()
  947. for folder, cfg := range f.model.folderCfgs {
  948. folderRoots[folder] = cfg.Path()
  949. folders = append(folders, folder)
  950. }
  951. f.model.fmut.RUnlock()
  952. for _, block := range state.blocks {
  953. if f.allowSparse && state.reused == 0 && block.IsEmpty() {
  954. // The block is a block of all zeroes, and we are not reusing
  955. // a temp file, so there is no need to do anything with it.
  956. // If we were reusing a temp file and had this block to copy,
  957. // it would be because the block in the temp file was *not* a
  958. // block of all zeroes, so then we should not skip it.
  959. // Pretend we copied it.
  960. state.copiedFromOrigin()
  961. continue
  962. }
  963. buf = buf[:int(block.Size)]
  964. found := f.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool {
  965. fd, err := os.Open(filepath.Join(folderRoots[folder], file))
  966. if err != nil {
  967. return false
  968. }
  969. _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
  970. fd.Close()
  971. if err != nil {
  972. return false
  973. }
  974. hash, err := scanner.VerifyBuffer(buf, block)
  975. if err != nil {
  976. if hash != nil {
  977. l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
  978. err = f.model.finder.Fix(folder, file, index, block.Hash, hash)
  979. if err != nil {
  980. l.Warnln("finder fix:", err)
  981. }
  982. } else {
  983. l.Debugln("Finder failed to verify buffer", err)
  984. }
  985. return false
  986. }
  987. _, err = dstFd.WriteAt(buf, block.Offset)
  988. if err != nil {
  989. state.fail("dst write", err)
  990. }
  991. if file == state.file.Name {
  992. state.copiedFromOrigin()
  993. }
  994. return true
  995. })
  996. if state.failed() != nil {
  997. break
  998. }
  999. if !found {
  1000. state.pullStarted()
  1001. ps := pullBlockState{
  1002. sharedPullerState: state.sharedPullerState,
  1003. block: block,
  1004. }
  1005. pullChan <- ps
  1006. } else {
  1007. state.copyDone(block)
  1008. }
  1009. }
  1010. out <- state.sharedPullerState
  1011. }
  1012. }
  1013. func (f *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  1014. for state := range in {
  1015. if state.failed() != nil {
  1016. out <- state.sharedPullerState
  1017. continue
  1018. }
  1019. // Get an fd to the temporary file. Technically we don't need it until
  1020. // after fetching the block, but if we run into an error here there is
  1021. // no point in issuing the request to the network.
  1022. fd, err := state.tempFile()
  1023. if err != nil {
  1024. out <- state.sharedPullerState
  1025. continue
  1026. }
  1027. if f.allowSparse && state.reused == 0 && state.block.IsEmpty() {
  1028. // There is no need to request a block of all zeroes. Pretend we
  1029. // requested it and handled it correctly.
  1030. state.pullDone(state.block)
  1031. out <- state.sharedPullerState
  1032. continue
  1033. }
  1034. var lastError error
  1035. candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block)
  1036. for {
  1037. // Select the least busy device to pull the block from. If we found no
  1038. // feasible device at all, fail the block (and in the long run, the
  1039. // file).
  1040. selected, found := activity.leastBusy(candidates)
  1041. if !found {
  1042. if lastError != nil {
  1043. state.fail("pull", lastError)
  1044. } else {
  1045. state.fail("pull", errNoDevice)
  1046. }
  1047. break
  1048. }
  1049. candidates = removeAvailability(candidates, selected)
  1050. // Fetch the block, while marking the selected device as in use so that
  1051. // leastBusy can select another device when someone else asks.
  1052. activity.using(selected)
  1053. buf, lastError := f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
  1054. activity.done(selected)
  1055. if lastError != nil {
  1056. l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
  1057. continue
  1058. }
  1059. // Verify that the received block matches the desired hash, if not
  1060. // try pulling it from another device.
  1061. _, lastError = scanner.VerifyBuffer(buf, state.block)
  1062. if lastError != nil {
  1063. l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
  1064. continue
  1065. }
  1066. // Save the block data we got from the cluster
  1067. _, err = fd.WriteAt(buf, state.block.Offset)
  1068. if err != nil {
  1069. state.fail("save", err)
  1070. } else {
  1071. state.pullDone(state.block)
  1072. }
  1073. break
  1074. }
  1075. out <- state.sharedPullerState
  1076. }
  1077. }
  1078. func (f *rwFolder) performFinish(state *sharedPullerState) error {
  1079. // Set the correct permission bits on the new file
  1080. if !f.ignorePermissions(state.file) {
  1081. if err := os.Chmod(state.tempName, os.FileMode(state.file.Permissions&0777)); err != nil {
  1082. return err
  1083. }
  1084. }
  1085. if stat, err := f.mtimeFS.Lstat(state.realName); err == nil {
  1086. // There is an old file or directory already in place. We need to
  1087. // handle that.
  1088. switch {
  1089. case stat.IsDir() || stat.Mode()&os.ModeSymlink != 0:
  1090. // It's a directory or a symlink. These are not versioned or
  1091. // archived for conflicts, only removed (which of course fails for
  1092. // non-empty directories).
  1093. // TODO: This is the place where we want to remove temporary files
  1094. // and future hard ignores before attempting a directory delete.
  1095. // Should share code with f.deletDir().
  1096. if err = osutil.InWritableDir(os.Remove, state.realName); err != nil {
  1097. return err
  1098. }
  1099. case f.inConflict(state.version, state.file.Version):
  1100. // The new file has been changed in conflict with the existing one. We
  1101. // should file it away as a conflict instead of just removing or
  1102. // archiving. Also merge with the version vector we had, to indicate
  1103. // we have resolved the conflict.
  1104. state.file.Version = state.file.Version.Merge(state.version)
  1105. if err = osutil.InWritableDir(f.moveForConflict, state.realName); err != nil {
  1106. return err
  1107. }
  1108. case f.versioner != nil:
  1109. // If we should use versioning, let the versioner archive the old
  1110. // file before we replace it. Archiving a non-existent file is not
  1111. // an error.
  1112. if err = f.versioner.Archive(state.realName); err != nil {
  1113. return err
  1114. }
  1115. }
  1116. }
  1117. // Replace the original content with the new one. If it didn't work,
  1118. // leave the temp file in place for reuse.
  1119. if err := osutil.TryRename(state.tempName, state.realName); err != nil {
  1120. return err
  1121. }
  1122. // Set the correct timestamp on the new file
  1123. f.mtimeFS.Chtimes(state.realName, state.file.ModTime(), state.file.ModTime()) // never fails
  1124. // If it's a symlink, the target of the symlink is inside the file.
  1125. if state.file.IsSymlink() {
  1126. content, err := ioutil.ReadFile(state.realName)
  1127. if err != nil {
  1128. return err
  1129. }
  1130. // Remove the file, and replace it with a symlink.
  1131. err = osutil.InWritableDir(func(path string) error {
  1132. os.Remove(path)
  1133. tt := symlinks.TargetFile
  1134. if state.file.IsDirectory() {
  1135. tt = symlinks.TargetDirectory
  1136. }
  1137. return symlinks.Create(path, string(content), tt)
  1138. }, state.realName)
  1139. if err != nil {
  1140. return err
  1141. }
  1142. }
  1143. // Record the updated file in the index
  1144. f.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile}
  1145. return nil
  1146. }
  1147. func (f *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
  1148. for state := range in {
  1149. if closed, err := state.finalClose(); closed {
  1150. l.Debugln(f, "closing", state.file.Name)
  1151. f.queue.Done(state.file.Name)
  1152. if err == nil {
  1153. err = f.performFinish(state)
  1154. }
  1155. if err != nil {
  1156. l.Infoln("Puller: final:", err)
  1157. f.newError(state.file.Name, err)
  1158. }
  1159. events.Default.Log(events.ItemFinished, map[string]interface{}{
  1160. "folder": f.folderID,
  1161. "item": state.file.Name,
  1162. "error": events.Error(err),
  1163. "type": "file",
  1164. "action": "update",
  1165. })
  1166. if f.model.progressEmitter != nil {
  1167. f.model.progressEmitter.Deregister(state)
  1168. }
  1169. }
  1170. }
  1171. }
  1172. // Moves the given filename to the front of the job queue
  1173. func (f *rwFolder) BringToFront(filename string) {
  1174. f.queue.BringToFront(filename)
  1175. }
  1176. func (f *rwFolder) Jobs() ([]string, []string) {
  1177. return f.queue.Jobs()
  1178. }
  1179. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1180. // larger than 1000 items, and no more delayed than 2 seconds.
  1181. func (f *rwFolder) dbUpdaterRoutine() {
  1182. const (
  1183. maxBatchSize = 1000
  1184. maxBatchTime = 2 * time.Second
  1185. )
  1186. batch := make([]dbUpdateJob, 0, maxBatchSize)
  1187. files := make([]protocol.FileInfo, 0, maxBatchSize)
  1188. tick := time.NewTicker(maxBatchTime)
  1189. defer tick.Stop()
  1190. var changedFiles []string
  1191. var changedDirs []string
  1192. if f.fsync {
  1193. changedFiles = make([]string, 0, maxBatchSize)
  1194. changedDirs = make([]string, 0, maxBatchSize)
  1195. }
  1196. syncFilesOnce := func(files []string, syncFn func(string) error) {
  1197. sort.Strings(files)
  1198. var lastFile string
  1199. for _, file := range files {
  1200. if lastFile == file {
  1201. continue
  1202. }
  1203. lastFile = file
  1204. if err := syncFn(file); err != nil {
  1205. l.Infof("fsync %q failed: %v", file, err)
  1206. }
  1207. }
  1208. }
  1209. handleBatch := func() {
  1210. found := false
  1211. var lastFile protocol.FileInfo
  1212. for _, job := range batch {
  1213. files = append(files, job.file)
  1214. if f.fsync {
  1215. // collect changed files and dirs
  1216. switch job.jobType {
  1217. case dbUpdateHandleFile, dbUpdateShortcutFile:
  1218. // fsyncing symlinks is only supported by MacOS
  1219. if !job.file.IsSymlink() {
  1220. changedFiles = append(changedFiles,
  1221. filepath.Join(f.dir, job.file.Name))
  1222. }
  1223. case dbUpdateHandleDir:
  1224. changedDirs = append(changedDirs, filepath.Join(f.dir, job.file.Name))
  1225. }
  1226. if job.jobType != dbUpdateShortcutFile {
  1227. changedDirs = append(changedDirs,
  1228. filepath.Dir(filepath.Join(f.dir, job.file.Name)))
  1229. }
  1230. }
  1231. if job.file.IsInvalid() || (job.file.IsDirectory() && !job.file.IsSymlink()) {
  1232. continue
  1233. }
  1234. if job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) == 0 {
  1235. continue
  1236. }
  1237. found = true
  1238. lastFile = job.file
  1239. }
  1240. if f.fsync {
  1241. // sync files and dirs to disk
  1242. syncFilesOnce(changedFiles, osutil.SyncFile)
  1243. changedFiles = changedFiles[:0]
  1244. syncFilesOnce(changedDirs, osutil.SyncDir)
  1245. changedDirs = changedDirs[:0]
  1246. }
  1247. // All updates to file/folder objects that originated remotely
  1248. // (across the network) use this call to updateLocals
  1249. f.model.updateLocalsFromPulling(f.folderID, files)
  1250. if found {
  1251. f.model.receivedFile(f.folderID, lastFile)
  1252. }
  1253. batch = batch[:0]
  1254. files = files[:0]
  1255. }
  1256. loop:
  1257. for {
  1258. select {
  1259. case job, ok := <-f.dbUpdates:
  1260. if !ok {
  1261. break loop
  1262. }
  1263. job.file.Sequence = 0
  1264. batch = append(batch, job)
  1265. if len(batch) == maxBatchSize {
  1266. handleBatch()
  1267. }
  1268. case <-tick.C:
  1269. if len(batch) > 0 {
  1270. handleBatch()
  1271. }
  1272. }
  1273. }
  1274. if len(batch) > 0 {
  1275. handleBatch()
  1276. }
  1277. }
  1278. func (f *rwFolder) inConflict(current, replacement protocol.Vector) bool {
  1279. if current.Concurrent(replacement) {
  1280. // Obvious case
  1281. return true
  1282. }
  1283. if replacement.Counter(f.model.shortID) > current.Counter(f.model.shortID) {
  1284. // The replacement file contains a higher version for ourselves than
  1285. // what we have. This isn't supposed to be possible, since it's only
  1286. // we who can increment that counter. We take it as a sign that
  1287. // something is wrong (our index may have been corrupted or removed)
  1288. // and flag it as a conflict.
  1289. return true
  1290. }
  1291. return false
  1292. }
  1293. func removeAvailability(availabilities []Availability, availability Availability) []Availability {
  1294. for i := range availabilities {
  1295. if availabilities[i] == availability {
  1296. availabilities[i] = availabilities[len(availabilities)-1]
  1297. return availabilities[:len(availabilities)-1]
  1298. }
  1299. }
  1300. return availabilities
  1301. }
  1302. func (f *rwFolder) moveForConflict(name string) error {
  1303. if strings.Contains(filepath.Base(name), ".sync-conflict-") {
  1304. l.Infoln("Conflict for", name, "which is already a conflict copy; not copying again.")
  1305. if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
  1306. return err
  1307. }
  1308. return nil
  1309. }
  1310. if f.maxConflicts == 0 {
  1311. if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
  1312. return err
  1313. }
  1314. return nil
  1315. }
  1316. ext := filepath.Ext(name)
  1317. withoutExt := name[:len(name)-len(ext)]
  1318. newName := withoutExt + time.Now().Format(".sync-conflict-20060102-150405") + ext
  1319. err := os.Rename(name, newName)
  1320. if os.IsNotExist(err) {
  1321. // We were supposed to move a file away but it does not exist. Either
  1322. // the user has already moved it away, or the conflict was between a
  1323. // remote modification and a local delete. In either way it does not
  1324. // matter, go ahead as if the move succeeded.
  1325. err = nil
  1326. }
  1327. if f.maxConflicts > -1 {
  1328. matches, gerr := osutil.Glob(withoutExt + ".sync-conflict-????????-??????" + ext)
  1329. if gerr == nil && len(matches) > f.maxConflicts {
  1330. sort.Sort(sort.Reverse(sort.StringSlice(matches)))
  1331. for _, match := range matches[f.maxConflicts:] {
  1332. gerr = os.Remove(match)
  1333. if gerr != nil {
  1334. l.Debugln(f, "removing extra conflict", gerr)
  1335. }
  1336. }
  1337. } else if gerr != nil {
  1338. l.Debugln(f, "globbing for conflicts", gerr)
  1339. }
  1340. }
  1341. return err
  1342. }
  1343. func (f *rwFolder) newError(path string, err error) {
  1344. f.errorsMut.Lock()
  1345. defer f.errorsMut.Unlock()
  1346. // We might get more than one error report for a file (i.e. error on
  1347. // Write() followed by Close()); we keep the first error as that is
  1348. // probably closer to the root cause.
  1349. if _, ok := f.errors[path]; ok {
  1350. return
  1351. }
  1352. f.errors[path] = err.Error()
  1353. }
  1354. func (f *rwFolder) clearErrors() {
  1355. f.errorsMut.Lock()
  1356. f.errors = make(map[string]string)
  1357. f.errorsMut.Unlock()
  1358. }
  1359. func (f *rwFolder) currentErrors() []fileError {
  1360. f.errorsMut.Lock()
  1361. errors := make([]fileError, 0, len(f.errors))
  1362. for path, err := range f.errors {
  1363. errors = append(errors, fileError{path, err})
  1364. }
  1365. sort.Sort(fileErrorList(errors))
  1366. f.errorsMut.Unlock()
  1367. return errors
  1368. }
  1369. // A []fileError is sent as part of an event and will be JSON serialized.
  1370. type fileError struct {
  1371. Path string `json:"path"`
  1372. Err string `json:"error"`
  1373. }
  1374. type fileErrorList []fileError
  1375. func (l fileErrorList) Len() int {
  1376. return len(l)
  1377. }
  1378. func (l fileErrorList) Less(a, b int) bool {
  1379. return l[a].Path < l[b].Path
  1380. }
  1381. func (l fileErrorList) Swap(a, b int) {
  1382. l[a], l[b] = l[b], l[a]
  1383. }
  1384. // fileValid returns nil when the file is valid for processing, or an error if it's not
  1385. func fileValid(file db.FileIntf) error {
  1386. switch {
  1387. case file.IsDeleted():
  1388. // We don't care about file validity if we're not supposed to have it
  1389. return nil
  1390. case !symlinks.Supported && file.IsSymlink():
  1391. return errUnsupportedSymlink
  1392. case runtime.GOOS == "windows" && windowsInvalidFilename(file.FileName()):
  1393. return errInvalidFilename
  1394. }
  1395. return nil
  1396. }
  1397. var windowsDisallowedCharacters = string([]rune{
  1398. '<', '>', ':', '"', '|', '?', '*',
  1399. 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
  1400. 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
  1401. 21, 22, 23, 24, 25, 26, 27, 28, 29, 30,
  1402. 31,
  1403. })
  1404. func windowsInvalidFilename(name string) bool {
  1405. // None of the path components should end in space
  1406. for _, part := range strings.Split(name, `\`) {
  1407. if len(part) == 0 {
  1408. continue
  1409. }
  1410. if part[len(part)-1] == ' ' {
  1411. // Names ending in space are not valid.
  1412. return true
  1413. }
  1414. }
  1415. // The path must not contain any disallowed characters
  1416. return strings.ContainsAny(name, windowsDisallowedCharacters)
  1417. }