rwfolder.go 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "errors"
  9. "fmt"
  10. "math/rand"
  11. "os"
  12. "path/filepath"
  13. "runtime"
  14. "sort"
  15. "strings"
  16. "time"
  17. "github.com/syncthing/syncthing/lib/config"
  18. "github.com/syncthing/syncthing/lib/db"
  19. "github.com/syncthing/syncthing/lib/events"
  20. "github.com/syncthing/syncthing/lib/fs"
  21. "github.com/syncthing/syncthing/lib/ignore"
  22. "github.com/syncthing/syncthing/lib/osutil"
  23. "github.com/syncthing/syncthing/lib/protocol"
  24. "github.com/syncthing/syncthing/lib/scanner"
  25. "github.com/syncthing/syncthing/lib/symlinks"
  26. "github.com/syncthing/syncthing/lib/sync"
  27. "github.com/syncthing/syncthing/lib/versioner"
  28. "github.com/syncthing/syncthing/lib/weakhash"
  29. )
  30. func init() {
  31. folderFactories[config.FolderTypeSendReceive] = newSendReceiveFolder
  32. }
  33. // A pullBlockState is passed to the puller routine for each block that needs
  34. // to be fetched.
  35. type pullBlockState struct {
  36. *sharedPullerState
  37. block protocol.BlockInfo
  38. }
  39. // A copyBlocksState is passed to copy routine if the file has blocks to be
  40. // copied.
  41. type copyBlocksState struct {
  42. *sharedPullerState
  43. blocks []protocol.BlockInfo
  44. }
  45. // Which filemode bits to preserve
  46. const retainBits = os.ModeSetgid | os.ModeSetuid | os.ModeSticky
  47. var (
  48. activity = newDeviceActivity()
  49. errNoDevice = errors.New("peers who had this file went away, or the file has changed while syncing. will retry later")
  50. )
  51. const (
  52. dbUpdateHandleDir = iota
  53. dbUpdateDeleteDir
  54. dbUpdateHandleFile
  55. dbUpdateDeleteFile
  56. dbUpdateShortcutFile
  57. dbUpdateHandleSymlink
  58. )
  59. const (
  60. defaultCopiers = 1
  61. defaultPullers = 16
  62. defaultPullerSleep = 10 * time.Second
  63. defaultPullerPause = 60 * time.Second
  64. )
  65. type dbUpdateJob struct {
  66. file protocol.FileInfo
  67. jobType int
  68. }
  69. type sendReceiveFolder struct {
  70. folder
  71. mtimeFS *fs.MtimeFS
  72. dir string
  73. versioner versioner.Versioner
  74. ignorePerms bool
  75. order config.PullOrder
  76. maxConflicts int
  77. sleep time.Duration
  78. pause time.Duration
  79. allowSparse bool
  80. checkFreeSpace bool
  81. ignoreDelete bool
  82. fsync bool
  83. useWeakHash bool
  84. copiers int
  85. pullers int
  86. queue *jobQueue
  87. dbUpdates chan dbUpdateJob
  88. pullTimer *time.Timer
  89. remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
  90. errors map[string]string // path -> error string
  91. errorsMut sync.Mutex
  92. initialScanCompleted chan (struct{}) // exposed for testing
  93. }
  94. func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner, mtimeFS *fs.MtimeFS) service {
  95. f := &sendReceiveFolder{
  96. folder: folder{
  97. stateTracker: newStateTracker(cfg.ID),
  98. scan: newFolderScanner(cfg),
  99. stop: make(chan struct{}),
  100. model: model,
  101. },
  102. mtimeFS: mtimeFS,
  103. dir: cfg.Path(),
  104. versioner: ver,
  105. ignorePerms: cfg.IgnorePerms,
  106. copiers: cfg.Copiers,
  107. pullers: cfg.Pullers,
  108. order: cfg.Order,
  109. maxConflicts: cfg.MaxConflicts,
  110. allowSparse: !cfg.DisableSparseFiles,
  111. checkFreeSpace: cfg.MinDiskFreePct != 0,
  112. ignoreDelete: cfg.IgnoreDelete,
  113. fsync: cfg.Fsync,
  114. useWeakHash: !cfg.DisableWeakHash,
  115. queue: newJobQueue(),
  116. pullTimer: time.NewTimer(time.Second),
  117. remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
  118. errorsMut: sync.NewMutex(),
  119. initialScanCompleted: make(chan struct{}),
  120. }
  121. f.configureCopiersAndPullers(cfg)
  122. return f
  123. }
  124. func (f *sendReceiveFolder) configureCopiersAndPullers(cfg config.FolderConfiguration) {
  125. if f.copiers == 0 {
  126. f.copiers = defaultCopiers
  127. }
  128. if f.pullers == 0 {
  129. f.pullers = defaultPullers
  130. }
  131. if cfg.PullerPauseS == 0 {
  132. f.pause = defaultPullerPause
  133. } else {
  134. f.pause = time.Duration(cfg.PullerPauseS) * time.Second
  135. }
  136. if cfg.PullerSleepS == 0 {
  137. f.sleep = defaultPullerSleep
  138. } else {
  139. f.sleep = time.Duration(cfg.PullerSleepS) * time.Second
  140. }
  141. }
  142. // Helper function to check whether either the ignorePerm flag has been
  143. // set on the local host or the FlagNoPermBits has been set on the file/dir
  144. // which is being pulled.
  145. func (f *sendReceiveFolder) ignorePermissions(file protocol.FileInfo) bool {
  146. return f.ignorePerms || file.NoPermissions
  147. }
  148. // Serve will run scans and pulls. It will return when Stop()ed or on a
  149. // critical error.
  150. func (f *sendReceiveFolder) Serve() {
  151. l.Debugln(f, "starting")
  152. defer l.Debugln(f, "exiting")
  153. defer func() {
  154. f.pullTimer.Stop()
  155. f.scan.timer.Stop()
  156. // TODO: Should there be an actual FolderStopped state?
  157. f.setState(FolderIdle)
  158. }()
  159. var prevSec int64
  160. var prevIgnoreHash string
  161. for {
  162. select {
  163. case <-f.stop:
  164. return
  165. case <-f.remoteIndex:
  166. prevSec = 0
  167. f.pullTimer.Reset(0)
  168. l.Debugln(f, "remote index updated, rescheduling pull")
  169. case <-f.pullTimer.C:
  170. select {
  171. case <-f.initialScanCompleted:
  172. default:
  173. // We don't start pulling files until a scan has been completed.
  174. l.Debugln(f, "skip (initial)")
  175. f.pullTimer.Reset(f.sleep)
  176. continue
  177. }
  178. f.model.fmut.RLock()
  179. curIgnores := f.model.folderIgnores[f.folderID]
  180. f.model.fmut.RUnlock()
  181. if newHash := curIgnores.Hash(); newHash != prevIgnoreHash {
  182. // The ignore patterns have changed. We need to re-evaluate if
  183. // there are files we need now that were ignored before.
  184. l.Debugln(f, "ignore patterns have changed, resetting prevVer")
  185. prevSec = 0
  186. prevIgnoreHash = newHash
  187. }
  188. // RemoteSequence() is a fast call, doesn't touch the database.
  189. curSeq, ok := f.model.RemoteSequence(f.folderID)
  190. if !ok || curSeq == prevSec {
  191. l.Debugln(f, "skip (curSeq == prevSeq)", prevSec, ok)
  192. f.pullTimer.Reset(f.sleep)
  193. continue
  194. }
  195. if err := f.model.CheckFolderHealth(f.folderID); err != nil {
  196. l.Infoln("Skipping folder", f.folderID, "pull due to folder error:", err)
  197. f.pullTimer.Reset(f.sleep)
  198. continue
  199. }
  200. l.Debugln(f, "pulling", prevSec, curSeq)
  201. f.setState(FolderSyncing)
  202. f.clearErrors()
  203. tries := 0
  204. for {
  205. tries++
  206. changed := f.pullerIteration(curIgnores)
  207. l.Debugln(f, "changed", changed)
  208. if changed == 0 {
  209. // No files were changed by the puller, so we are in
  210. // sync. Remember the local version number and
  211. // schedule a resync a little bit into the future.
  212. if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < curSeq {
  213. // There's a corner case where the device we needed
  214. // files from disconnected during the puller
  215. // iteration. The files will have been removed from
  216. // the index, so we've concluded that we don't need
  217. // them, but at the same time we have the local
  218. // version that includes those files in curVer. So we
  219. // catch the case that sequence might have
  220. // decreased here.
  221. l.Debugln(f, "adjusting curVer", lv)
  222. curSeq = lv
  223. }
  224. prevSec = curSeq
  225. l.Debugln(f, "next pull in", f.sleep)
  226. f.pullTimer.Reset(f.sleep)
  227. break
  228. }
  229. if tries > 10 {
  230. // We've tried a bunch of times to get in sync, but
  231. // we're not making it. Probably there are write
  232. // errors preventing us. Flag this with a warning and
  233. // wait a bit longer before retrying.
  234. l.Infof("Folder %q isn't making progress. Pausing puller for %v.", f.folderID, f.pause)
  235. l.Debugln(f, "next pull in", f.pause)
  236. if folderErrors := f.currentErrors(); len(folderErrors) > 0 {
  237. events.Default.Log(events.FolderErrors, map[string]interface{}{
  238. "folder": f.folderID,
  239. "errors": folderErrors,
  240. })
  241. }
  242. f.pullTimer.Reset(f.pause)
  243. break
  244. }
  245. }
  246. f.setState(FolderIdle)
  247. // The reason for running the scanner from within the puller is that
  248. // this is the easiest way to make sure we are not doing both at the
  249. // same time.
  250. case <-f.scan.timer.C:
  251. err := f.scanSubdirsIfHealthy(nil)
  252. f.scan.Reschedule()
  253. if err != nil {
  254. continue
  255. }
  256. select {
  257. case <-f.initialScanCompleted:
  258. default:
  259. l.Infoln("Completed initial scan (rw) of folder", f.folderID)
  260. close(f.initialScanCompleted)
  261. }
  262. case req := <-f.scan.now:
  263. req.err <- f.scanSubdirsIfHealthy(req.subdirs)
  264. case next := <-f.scan.delay:
  265. f.scan.timer.Reset(next)
  266. }
  267. }
  268. }
  269. func (f *sendReceiveFolder) IndexUpdated() {
  270. select {
  271. case f.remoteIndex <- struct{}{}:
  272. default:
  273. // We might be busy doing a pull and thus not reading from this
  274. // channel. The channel is 1-buffered, so one notification will be
  275. // queued to ensure we recheck after the pull, but beyond that we must
  276. // make sure to not block index receiving.
  277. }
  278. }
  279. func (f *sendReceiveFolder) String() string {
  280. return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f)
  281. }
  282. // pullerIteration runs a single puller iteration for the given folder and
  283. // returns the number items that should have been synced (even those that
  284. // might have failed). One puller iteration handles all files currently
  285. // flagged as needed in the folder.
  286. func (f *sendReceiveFolder) pullerIteration(ignores *ignore.Matcher) int {
  287. pullChan := make(chan pullBlockState)
  288. copyChan := make(chan copyBlocksState)
  289. finisherChan := make(chan *sharedPullerState)
  290. updateWg := sync.NewWaitGroup()
  291. copyWg := sync.NewWaitGroup()
  292. pullWg := sync.NewWaitGroup()
  293. doneWg := sync.NewWaitGroup()
  294. l.Debugln(f, "c", f.copiers, "p", f.pullers)
  295. f.dbUpdates = make(chan dbUpdateJob)
  296. updateWg.Add(1)
  297. go func() {
  298. // dbUpdaterRoutine finishes when f.dbUpdates is closed
  299. f.dbUpdaterRoutine()
  300. updateWg.Done()
  301. }()
  302. for i := 0; i < f.copiers; i++ {
  303. copyWg.Add(1)
  304. go func() {
  305. // copierRoutine finishes when copyChan is closed
  306. f.copierRoutine(copyChan, pullChan, finisherChan)
  307. copyWg.Done()
  308. }()
  309. }
  310. for i := 0; i < f.pullers; i++ {
  311. pullWg.Add(1)
  312. go func() {
  313. // pullerRoutine finishes when pullChan is closed
  314. f.pullerRoutine(pullChan, finisherChan)
  315. pullWg.Done()
  316. }()
  317. }
  318. doneWg.Add(1)
  319. // finisherRoutine finishes when finisherChan is closed
  320. go func() {
  321. f.finisherRoutine(finisherChan)
  322. doneWg.Done()
  323. }()
  324. f.model.fmut.RLock()
  325. folderFiles := f.model.folderFiles[f.folderID]
  326. f.model.fmut.RUnlock()
  327. changed := 0
  328. var processDirectly []protocol.FileInfo
  329. // Iterate the list of items that we need and sort them into piles.
  330. // Regular files to pull goes into the file queue, everything else
  331. // (directories, symlinks and deletes) goes into the "process directly"
  332. // pile.
  333. folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
  334. if shouldIgnore(intf, ignores, f.ignoreDelete, defTempNamer) {
  335. return true
  336. }
  337. if err := fileValid(intf); err != nil {
  338. // The file isn't valid so we can't process it. Pretend that we
  339. // tried and set the error for the file.
  340. f.newError(intf.FileName(), err)
  341. changed++
  342. return true
  343. }
  344. file := intf.(protocol.FileInfo)
  345. switch {
  346. case file.IsDeleted():
  347. processDirectly = append(processDirectly, file)
  348. changed++
  349. case file.Type == protocol.FileInfoTypeFile:
  350. // Queue files for processing after directories and symlinks, if
  351. // it has availability.
  352. devices := folderFiles.Availability(file.Name)
  353. for _, dev := range devices {
  354. if f.model.ConnectedTo(dev) {
  355. f.queue.Push(file.Name, file.Size, file.ModTime())
  356. changed++
  357. break
  358. }
  359. }
  360. default:
  361. // Directories, symlinks
  362. processDirectly = append(processDirectly, file)
  363. changed++
  364. }
  365. return true
  366. })
  367. // Sort the "process directly" pile by number of path components. This
  368. // ensures that we handle parents before children.
  369. sort.Sort(byComponentCount(processDirectly))
  370. // Process the list.
  371. fileDeletions := map[string]protocol.FileInfo{}
  372. dirDeletions := []protocol.FileInfo{}
  373. buckets := map[string][]protocol.FileInfo{}
  374. for _, fi := range processDirectly {
  375. // Verify that the thing we are handling lives inside a directory,
  376. // and not a symlink or empty space.
  377. if !osutil.IsDir(f.dir, filepath.Dir(fi.Name)) {
  378. f.newError(fi.Name, errNotDir)
  379. continue
  380. }
  381. switch {
  382. case fi.IsDeleted():
  383. // A deleted file, directory or symlink
  384. if fi.IsDirectory() {
  385. // Perform directory deletions at the end, as we may have
  386. // files to delete inside them before we get to that point.
  387. dirDeletions = append(dirDeletions, fi)
  388. } else {
  389. fileDeletions[fi.Name] = fi
  390. df, ok := f.model.CurrentFolderFile(f.folderID, fi.Name)
  391. // Local file can be already deleted, but with a lower version
  392. // number, hence the deletion coming in again as part of
  393. // WithNeed, furthermore, the file can simply be of the wrong
  394. // type if we haven't yet managed to pull it.
  395. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
  396. // Put files into buckets per first hash
  397. key := string(df.Blocks[0].Hash)
  398. buckets[key] = append(buckets[key], df)
  399. }
  400. }
  401. case fi.IsDirectory() && !fi.IsSymlink():
  402. l.Debugln("Handling directory", fi.Name)
  403. f.handleDir(fi)
  404. case fi.IsSymlink():
  405. l.Debugln("Handling symlink", fi.Name)
  406. f.handleSymlink(fi)
  407. default:
  408. l.Warnln(fi)
  409. panic("unhandleable item type, can't happen")
  410. }
  411. }
  412. // Now do the file queue. Reorder it according to configuration.
  413. switch f.order {
  414. case config.OrderRandom:
  415. f.queue.Shuffle()
  416. case config.OrderAlphabetic:
  417. // The queue is already in alphabetic order.
  418. case config.OrderSmallestFirst:
  419. f.queue.SortSmallestFirst()
  420. case config.OrderLargestFirst:
  421. f.queue.SortLargestFirst()
  422. case config.OrderOldestFirst:
  423. f.queue.SortOldestFirst()
  424. case config.OrderNewestFirst:
  425. f.queue.SortNewestFirst()
  426. }
  427. // Process the file queue.
  428. nextFile:
  429. for {
  430. select {
  431. case <-f.stop:
  432. // Stop processing files if the puller has been told to stop.
  433. break
  434. default:
  435. }
  436. fileName, ok := f.queue.Pop()
  437. if !ok {
  438. break
  439. }
  440. fi, ok := f.model.CurrentGlobalFile(f.folderID, fileName)
  441. if !ok {
  442. // File is no longer in the index. Mark it as done and drop it.
  443. f.queue.Done(fileName)
  444. continue
  445. }
  446. if fi.IsDeleted() || fi.Type != protocol.FileInfoTypeFile {
  447. // The item has changed type or status in the index while we
  448. // were processing directories above.
  449. f.queue.Done(fileName)
  450. continue
  451. }
  452. // Verify that the thing we are handling lives inside a directory,
  453. // and not a symlink or empty space.
  454. if !osutil.IsDir(f.dir, filepath.Dir(fi.Name)) {
  455. f.newError(fi.Name, errNotDir)
  456. continue
  457. }
  458. // Check our list of files to be removed for a match, in which case
  459. // we can just do a rename instead.
  460. key := string(fi.Blocks[0].Hash)
  461. for i, candidate := range buckets[key] {
  462. if scanner.BlocksEqual(candidate.Blocks, fi.Blocks) {
  463. // Remove the candidate from the bucket
  464. lidx := len(buckets[key]) - 1
  465. buckets[key][i] = buckets[key][lidx]
  466. buckets[key] = buckets[key][:lidx]
  467. // candidate is our current state of the file, where as the
  468. // desired state with the delete bit set is in the deletion
  469. // map.
  470. desired := fileDeletions[candidate.Name]
  471. // Remove the pending deletion (as we perform it by renaming)
  472. delete(fileDeletions, candidate.Name)
  473. f.renameFile(desired, fi)
  474. f.queue.Done(fileName)
  475. continue nextFile
  476. }
  477. }
  478. // Handle the file normally, by coping and pulling, etc.
  479. f.handleFile(fi, copyChan, finisherChan)
  480. }
  481. // Signal copy and puller routines that we are done with the in data for
  482. // this iteration. Wait for them to finish.
  483. close(copyChan)
  484. copyWg.Wait()
  485. close(pullChan)
  486. pullWg.Wait()
  487. // Signal the finisher chan that there will be no more input.
  488. close(finisherChan)
  489. // Wait for the finisherChan to finish.
  490. doneWg.Wait()
  491. for _, file := range fileDeletions {
  492. l.Debugln("Deleting file", file.Name)
  493. f.deleteFile(file)
  494. }
  495. for i := range dirDeletions {
  496. dir := dirDeletions[len(dirDeletions)-i-1]
  497. l.Debugln("Deleting dir", dir.Name)
  498. f.deleteDir(dir, ignores)
  499. }
  500. // Wait for db updates to complete
  501. close(f.dbUpdates)
  502. updateWg.Wait()
  503. return changed
  504. }
  505. // handleDir creates or updates the given directory
  506. func (f *sendReceiveFolder) handleDir(file protocol.FileInfo) {
  507. // Used in the defer closure below, updated by the function body. Take
  508. // care not declare another err.
  509. var err error
  510. events.Default.Log(events.ItemStarted, map[string]string{
  511. "folder": f.folderID,
  512. "item": file.Name,
  513. "type": "dir",
  514. "action": "update",
  515. })
  516. defer func() {
  517. events.Default.Log(events.ItemFinished, map[string]interface{}{
  518. "folder": f.folderID,
  519. "item": file.Name,
  520. "error": events.Error(err),
  521. "type": "dir",
  522. "action": "update",
  523. })
  524. }()
  525. realName, err := rootedJoinedPath(f.dir, file.Name)
  526. if err != nil {
  527. f.newError(file.Name, err)
  528. return
  529. }
  530. mode := os.FileMode(file.Permissions & 0777)
  531. if f.ignorePermissions(file) {
  532. mode = 0777
  533. }
  534. if shouldDebug() {
  535. curFile, _ := f.model.CurrentFolderFile(f.folderID, file.Name)
  536. l.Debugf("need dir\n\t%v\n\t%v", file, curFile)
  537. }
  538. info, err := f.mtimeFS.Lstat(realName)
  539. switch {
  540. // There is already something under that name, but it's a file/link.
  541. // Most likely a file/link is getting replaced with a directory.
  542. // Remove the file/link and fall through to directory creation.
  543. case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0):
  544. err = osutil.InWritableDir(os.Remove, realName)
  545. if err != nil {
  546. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  547. f.newError(file.Name, err)
  548. return
  549. }
  550. fallthrough
  551. // The directory doesn't exist, so we create it with the right
  552. // mode bits from the start.
  553. case err != nil && os.IsNotExist(err):
  554. // We declare a function that acts on only the path name, so
  555. // we can pass it to InWritableDir. We use a regular Mkdir and
  556. // not MkdirAll because the parent should already exist.
  557. mkdir := func(path string) error {
  558. err = os.Mkdir(path, mode)
  559. if err != nil || f.ignorePermissions(file) {
  560. return err
  561. }
  562. // Stat the directory so we can check its permissions.
  563. info, err := f.mtimeFS.Lstat(path)
  564. if err != nil {
  565. return err
  566. }
  567. // Mask for the bits we want to preserve and add them in to the
  568. // directories permissions.
  569. return os.Chmod(path, mode|(info.Mode()&retainBits))
  570. }
  571. if err = osutil.InWritableDir(mkdir, realName); err == nil {
  572. f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  573. } else {
  574. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  575. f.newError(file.Name, err)
  576. }
  577. return
  578. // Weird error when stat()'ing the dir. Probably won't work to do
  579. // anything else with it if we can't even stat() it.
  580. case err != nil:
  581. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  582. f.newError(file.Name, err)
  583. return
  584. }
  585. // The directory already exists, so we just correct the mode bits. (We
  586. // don't handle modification times on directories, because that sucks...)
  587. // It's OK to change mode bits on stuff within non-writable directories.
  588. if f.ignorePermissions(file) {
  589. f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  590. } else if err := os.Chmod(realName, mode|(info.Mode()&retainBits)); err == nil {
  591. f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir}
  592. } else {
  593. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  594. f.newError(file.Name, err)
  595. }
  596. }
  597. // handleSymlink creates or updates the given symlink
  598. func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo) {
  599. // Used in the defer closure below, updated by the function body. Take
  600. // care not declare another err.
  601. var err error
  602. events.Default.Log(events.ItemStarted, map[string]string{
  603. "folder": f.folderID,
  604. "item": file.Name,
  605. "type": "symlink",
  606. "action": "update",
  607. })
  608. defer func() {
  609. events.Default.Log(events.ItemFinished, map[string]interface{}{
  610. "folder": f.folderID,
  611. "item": file.Name,
  612. "error": events.Error(err),
  613. "type": "symlink",
  614. "action": "update",
  615. })
  616. }()
  617. realName, err := rootedJoinedPath(f.dir, file.Name)
  618. if err != nil {
  619. f.newError(file.Name, err)
  620. return
  621. }
  622. if shouldDebug() {
  623. curFile, _ := f.model.CurrentFolderFile(f.folderID, file.Name)
  624. l.Debugf("need symlink\n\t%v\n\t%v", file, curFile)
  625. }
  626. if len(file.SymlinkTarget) == 0 {
  627. // Index entry from a Syncthing predating the support for including
  628. // the link target in the index entry. We log this as an error.
  629. err = errors.New("incompatible symlink entry; rescan with newer Syncthing on source")
  630. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  631. f.newError(file.Name, err)
  632. return
  633. }
  634. if _, err = f.mtimeFS.Lstat(realName); err == nil {
  635. // There is already something under that name. Remove it to replace
  636. // with the symlink. This also handles the "change symlink type"
  637. // path.
  638. err = osutil.InWritableDir(os.Remove, realName)
  639. if err != nil {
  640. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  641. f.newError(file.Name, err)
  642. return
  643. }
  644. }
  645. tt := symlinks.TargetFile
  646. if file.IsDirectory() {
  647. tt = symlinks.TargetDirectory
  648. }
  649. // We declare a function that acts on only the path name, so
  650. // we can pass it to InWritableDir.
  651. createLink := func(path string) error {
  652. return symlinks.Create(path, file.SymlinkTarget, tt)
  653. }
  654. if err = osutil.InWritableDir(createLink, realName); err == nil {
  655. f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleSymlink}
  656. } else {
  657. l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err)
  658. f.newError(file.Name, err)
  659. }
  660. }
  661. // deleteDir attempts to delete the given directory
  662. func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Matcher) {
  663. // Used in the defer closure below, updated by the function body. Take
  664. // care not declare another err.
  665. var err error
  666. events.Default.Log(events.ItemStarted, map[string]string{
  667. "folder": f.folderID,
  668. "item": file.Name,
  669. "type": "dir",
  670. "action": "delete",
  671. })
  672. defer func() {
  673. events.Default.Log(events.ItemFinished, map[string]interface{}{
  674. "folder": f.folderID,
  675. "item": file.Name,
  676. "error": events.Error(err),
  677. "type": "dir",
  678. "action": "delete",
  679. })
  680. }()
  681. realName, err := rootedJoinedPath(f.dir, file.Name)
  682. if err != nil {
  683. f.newError(file.Name, err)
  684. return
  685. }
  686. // Delete any temporary files lying around in the directory
  687. dir, _ := os.Open(realName)
  688. if dir != nil {
  689. files, _ := dir.Readdirnames(-1)
  690. for _, dirFile := range files {
  691. fullDirFile := filepath.Join(file.Name, dirFile)
  692. if defTempNamer.IsTemporary(dirFile) || (matcher != nil && matcher.Match(fullDirFile).IsDeletable()) {
  693. os.RemoveAll(filepath.Join(f.dir, fullDirFile))
  694. }
  695. }
  696. dir.Close()
  697. }
  698. err = osutil.InWritableDir(os.Remove, realName)
  699. if err == nil || os.IsNotExist(err) {
  700. // It was removed or it doesn't exist to start with
  701. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  702. } else if _, serr := f.mtimeFS.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  703. // We get an error just looking at the directory, and it's not a
  704. // permission problem. Lets assume the error is in fact some variant
  705. // of "file does not exist" (possibly expressed as some parent being a
  706. // file and not a directory etc) and that the delete is handled.
  707. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir}
  708. } else {
  709. l.Infof("Puller (folder %q, dir %q): delete: %v", f.folderID, file.Name, err)
  710. f.newError(file.Name, err)
  711. }
  712. }
  713. // deleteFile attempts to delete the given file
  714. func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo) {
  715. // Used in the defer closure below, updated by the function body. Take
  716. // care not declare another err.
  717. var err error
  718. events.Default.Log(events.ItemStarted, map[string]string{
  719. "folder": f.folderID,
  720. "item": file.Name,
  721. "type": "file",
  722. "action": "delete",
  723. })
  724. defer func() {
  725. events.Default.Log(events.ItemFinished, map[string]interface{}{
  726. "folder": f.folderID,
  727. "item": file.Name,
  728. "error": events.Error(err),
  729. "type": "file",
  730. "action": "delete",
  731. })
  732. }()
  733. realName, err := rootedJoinedPath(f.dir, file.Name)
  734. if err != nil {
  735. f.newError(file.Name, err)
  736. return
  737. }
  738. cur, ok := f.model.CurrentFolderFile(f.folderID, file.Name)
  739. if ok && f.inConflict(cur.Version, file.Version) {
  740. // There is a conflict here. Move the file to a conflict copy instead
  741. // of deleting. Also merge with the version vector we had, to indicate
  742. // we have resolved the conflict.
  743. file.Version = file.Version.Merge(cur.Version)
  744. err = osutil.InWritableDir(f.moveForConflict, realName)
  745. } else if f.versioner != nil {
  746. err = osutil.InWritableDir(f.versioner.Archive, realName)
  747. } else {
  748. err = osutil.InWritableDir(os.Remove, realName)
  749. }
  750. if err == nil || os.IsNotExist(err) {
  751. // It was removed or it doesn't exist to start with
  752. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  753. } else if _, serr := f.mtimeFS.Lstat(realName); serr != nil && !os.IsPermission(serr) {
  754. // We get an error just looking at the file, and it's not a permission
  755. // problem. Lets assume the error is in fact some variant of "file
  756. // does not exist" (possibly expressed as some parent being a file and
  757. // not a directory etc) and that the delete is handled.
  758. f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile}
  759. } else {
  760. l.Infof("Puller (folder %q, file %q): delete: %v", f.folderID, file.Name, err)
  761. f.newError(file.Name, err)
  762. }
  763. }
  764. // renameFile attempts to rename an existing file to a destination
  765. // and set the right attributes on it.
  766. func (f *sendReceiveFolder) renameFile(source, target protocol.FileInfo) {
  767. // Used in the defer closure below, updated by the function body. Take
  768. // care not declare another err.
  769. var err error
  770. events.Default.Log(events.ItemStarted, map[string]string{
  771. "folder": f.folderID,
  772. "item": source.Name,
  773. "type": "file",
  774. "action": "delete",
  775. })
  776. events.Default.Log(events.ItemStarted, map[string]string{
  777. "folder": f.folderID,
  778. "item": target.Name,
  779. "type": "file",
  780. "action": "update",
  781. })
  782. defer func() {
  783. events.Default.Log(events.ItemFinished, map[string]interface{}{
  784. "folder": f.folderID,
  785. "item": source.Name,
  786. "error": events.Error(err),
  787. "type": "file",
  788. "action": "delete",
  789. })
  790. events.Default.Log(events.ItemFinished, map[string]interface{}{
  791. "folder": f.folderID,
  792. "item": target.Name,
  793. "error": events.Error(err),
  794. "type": "file",
  795. "action": "update",
  796. })
  797. }()
  798. l.Debugln(f, "taking rename shortcut", source.Name, "->", target.Name)
  799. from, err := rootedJoinedPath(f.dir, source.Name)
  800. if err != nil {
  801. f.newError(source.Name, err)
  802. return
  803. }
  804. to, err := rootedJoinedPath(f.dir, target.Name)
  805. if err != nil {
  806. f.newError(target.Name, err)
  807. return
  808. }
  809. if f.versioner != nil {
  810. err = osutil.Copy(from, to)
  811. if err == nil {
  812. err = osutil.InWritableDir(f.versioner.Archive, from)
  813. }
  814. } else {
  815. err = osutil.TryRename(from, to)
  816. }
  817. if err == nil {
  818. // The file was renamed, so we have handled both the necessary delete
  819. // of the source and the creation of the target. Fix-up the metadata,
  820. // and update the local index of the target file.
  821. f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  822. err = f.shortcutFile(target)
  823. if err != nil {
  824. l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", f.folderID, target.Name, source.Name, err)
  825. f.newError(target.Name, err)
  826. return
  827. }
  828. f.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile}
  829. } else {
  830. // We failed the rename so we have a source file that we still need to
  831. // get rid of. Attempt to delete it instead so that we make *some*
  832. // progress. The target is unhandled.
  833. err = osutil.InWritableDir(os.Remove, from)
  834. if err != nil {
  835. l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", f.folderID, target.Name, source.Name, err)
  836. f.newError(target.Name, err)
  837. return
  838. }
  839. f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile}
  840. }
  841. }
  842. // This is the flow of data and events here, I think...
  843. //
  844. // +-----------------------+
  845. // | | - - - - > ItemStarted
  846. // | handleFile | - - - - > ItemFinished (on shortcuts)
  847. // | |
  848. // +-----------------------+
  849. // |
  850. // | copyChan (copyBlocksState; unless shortcut taken)
  851. // |
  852. // | +-----------------------+
  853. // | | +-----------------------+
  854. // +--->| | |
  855. // | | copierRoutine |
  856. // +-| |
  857. // +-----------------------+
  858. // |
  859. // | pullChan (sharedPullerState)
  860. // |
  861. // | +-----------------------+
  862. // | | +-----------------------+
  863. // +-->| | |
  864. // | | pullerRoutine |
  865. // +-| |
  866. // +-----------------------+
  867. // |
  868. // | finisherChan (sharedPullerState)
  869. // |
  870. // | +-----------------------+
  871. // | | |
  872. // +-->| finisherRoutine | - - - - > ItemFinished
  873. // | |
  874. // +-----------------------+
  875. // handleFile queues the copies and pulls as necessary for a single new or
  876. // changed file.
  877. func (f *sendReceiveFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
  878. curFile, hasCurFile := f.model.CurrentFolderFile(f.folderID, file.Name)
  879. if hasCurFile && len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) {
  880. // We are supposed to copy the entire file, and then fetch nothing. We
  881. // are only updating metadata, so we don't actually *need* to make the
  882. // copy.
  883. l.Debugln(f, "taking shortcut on", file.Name)
  884. events.Default.Log(events.ItemStarted, map[string]string{
  885. "folder": f.folderID,
  886. "item": file.Name,
  887. "type": "file",
  888. "action": "metadata",
  889. })
  890. f.queue.Done(file.Name)
  891. err := f.shortcutFile(file)
  892. events.Default.Log(events.ItemFinished, map[string]interface{}{
  893. "folder": f.folderID,
  894. "item": file.Name,
  895. "error": events.Error(err),
  896. "type": "file",
  897. "action": "metadata",
  898. })
  899. if err != nil {
  900. l.Infoln("Puller: shortcut:", err)
  901. f.newError(file.Name, err)
  902. } else {
  903. f.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile}
  904. }
  905. return
  906. }
  907. // Figure out the absolute filenames we need once and for all
  908. tempName, err := rootedJoinedPath(f.dir, defTempNamer.TempName(file.Name))
  909. if err != nil {
  910. f.newError(file.Name, err)
  911. return
  912. }
  913. realName, err := rootedJoinedPath(f.dir, file.Name)
  914. if err != nil {
  915. f.newError(file.Name, err)
  916. return
  917. }
  918. if hasCurFile && !curFile.IsDirectory() && !curFile.IsSymlink() {
  919. // Check that the file on disk is what we expect it to be according to
  920. // the database. If there's a mismatch here, there might be local
  921. // changes that we don't know about yet and we should scan before
  922. // touching the file. If we can't stat the file we'll just pull it.
  923. if info, err := f.mtimeFS.Lstat(realName); err == nil {
  924. if !info.ModTime().Equal(curFile.ModTime()) || info.Size() != curFile.Size {
  925. l.Debugln("file modified but not rescanned; not pulling:", realName)
  926. // Scan() is synchronous (i.e. blocks until the scan is
  927. // completed and returns an error), but a scan can't happen
  928. // while we're in the puller routine. Request the scan in the
  929. // background and it'll be handled when the current pulling
  930. // sweep is complete. As we do retries, we'll queue the scan
  931. // for this file up to ten times, but the last nine of those
  932. // scans will be cheap...
  933. go f.scan.Scan([]string{file.Name})
  934. return
  935. }
  936. }
  937. }
  938. scanner.PopulateOffsets(file.Blocks)
  939. var blocks []protocol.BlockInfo
  940. var blocksSize int64
  941. var reused []int32
  942. // Check for an old temporary file which might have some blocks we could
  943. // reuse.
  944. tempBlocks, err := scanner.HashFile(tempName, protocol.BlockSize, nil)
  945. if err == nil {
  946. // Check for any reusable blocks in the temp file
  947. tempCopyBlocks, _ := scanner.BlockDiff(tempBlocks, file.Blocks)
  948. // block.String() returns a string unique to the block
  949. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  950. for _, block := range tempCopyBlocks {
  951. existingBlocks[block.String()] = struct{}{}
  952. }
  953. // Since the blocks are already there, we don't need to get them.
  954. for i, block := range file.Blocks {
  955. _, ok := existingBlocks[block.String()]
  956. if !ok {
  957. blocks = append(blocks, block)
  958. blocksSize += int64(block.Size)
  959. } else {
  960. reused = append(reused, int32(i))
  961. }
  962. }
  963. // The sharedpullerstate will know which flags to use when opening the
  964. // temp file depending if we are reusing any blocks or not.
  965. if len(reused) == 0 {
  966. // Otherwise, discard the file ourselves in order for the
  967. // sharedpuller not to panic when it fails to exclusively create a
  968. // file which already exists
  969. osutil.InWritableDir(os.Remove, tempName)
  970. }
  971. } else {
  972. // Copy the blocks, as we don't want to shuffle them on the FileInfo
  973. blocks = append(blocks, file.Blocks...)
  974. blocksSize = file.Size
  975. }
  976. if f.checkFreeSpace {
  977. if free, err := osutil.DiskFreeBytes(f.dir); err == nil && free < blocksSize {
  978. l.Warnf(`Folder "%s": insufficient disk space in %s for %s: have %.2f MiB, need %.2f MiB`, f.folderID, f.dir, file.Name, float64(free)/1024/1024, float64(blocksSize)/1024/1024)
  979. f.newError(file.Name, errors.New("insufficient space"))
  980. return
  981. }
  982. }
  983. // Shuffle the blocks
  984. for i := range blocks {
  985. j := rand.Intn(i + 1)
  986. blocks[i], blocks[j] = blocks[j], blocks[i]
  987. }
  988. events.Default.Log(events.ItemStarted, map[string]string{
  989. "folder": f.folderID,
  990. "item": file.Name,
  991. "type": "file",
  992. "action": "update",
  993. })
  994. s := sharedPullerState{
  995. file: file,
  996. folder: f.folderID,
  997. tempName: tempName,
  998. realName: realName,
  999. copyTotal: len(blocks),
  1000. copyNeeded: len(blocks),
  1001. reused: len(reused),
  1002. updated: time.Now(),
  1003. available: reused,
  1004. availableUpdated: time.Now(),
  1005. ignorePerms: f.ignorePermissions(file),
  1006. version: curFile.Version,
  1007. mut: sync.NewRWMutex(),
  1008. sparse: f.allowSparse,
  1009. created: time.Now(),
  1010. }
  1011. l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), len(reused))
  1012. cs := copyBlocksState{
  1013. sharedPullerState: &s,
  1014. blocks: blocks,
  1015. }
  1016. copyChan <- cs
  1017. }
  1018. // shortcutFile sets file mode and modification time, when that's the only
  1019. // thing that has changed.
  1020. func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo) error {
  1021. realName, err := rootedJoinedPath(f.dir, file.Name)
  1022. if err != nil {
  1023. f.newError(file.Name, err)
  1024. return err
  1025. }
  1026. if !f.ignorePermissions(file) {
  1027. if err := os.Chmod(realName, os.FileMode(file.Permissions&0777)); err != nil {
  1028. l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", f.folderID, file.Name, err)
  1029. f.newError(file.Name, err)
  1030. return err
  1031. }
  1032. }
  1033. f.mtimeFS.Chtimes(realName, file.ModTime(), file.ModTime()) // never fails
  1034. // This may have been a conflict. We should merge the version vectors so
  1035. // that our clock doesn't move backwards.
  1036. if cur, ok := f.model.CurrentFolderFile(f.folderID, file.Name); ok {
  1037. file.Version = file.Version.Merge(cur.Version)
  1038. }
  1039. return nil
  1040. }
  1041. // copierRoutine reads copierStates until the in channel closes and performs
  1042. // the relevant copies when possible, or passes it to the puller routine.
  1043. func (f *sendReceiveFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  1044. buf := make([]byte, protocol.BlockSize)
  1045. for state := range in {
  1046. dstFd, err := state.tempFile()
  1047. if err != nil {
  1048. // Nothing more to do for this failed file, since we couldn't create a temporary for it.
  1049. out <- state.sharedPullerState
  1050. continue
  1051. }
  1052. if f.model.progressEmitter != nil {
  1053. f.model.progressEmitter.Register(state.sharedPullerState)
  1054. }
  1055. folderRoots := make(map[string]string)
  1056. var folders []string
  1057. f.model.fmut.RLock()
  1058. for folder, cfg := range f.model.folderCfgs {
  1059. folderRoots[folder] = cfg.Path()
  1060. folders = append(folders, folder)
  1061. }
  1062. f.model.fmut.RUnlock()
  1063. var weakHashFinder *weakhash.Finder
  1064. if f.useWeakHash {
  1065. hashesToFind := make([]uint32, 0, len(state.blocks))
  1066. for _, block := range state.blocks {
  1067. if block.WeakHash != 0 {
  1068. hashesToFind = append(hashesToFind, block.WeakHash)
  1069. }
  1070. }
  1071. weakHashFinder, err = weakhash.NewFinder(state.realName, protocol.BlockSize, hashesToFind)
  1072. if err != nil {
  1073. l.Debugln("weak hasher", err)
  1074. }
  1075. }
  1076. for _, block := range state.blocks {
  1077. if f.allowSparse && state.reused == 0 && block.IsEmpty() {
  1078. // The block is a block of all zeroes, and we are not reusing
  1079. // a temp file, so there is no need to do anything with it.
  1080. // If we were reusing a temp file and had this block to copy,
  1081. // it would be because the block in the temp file was *not* a
  1082. // block of all zeroes, so then we should not skip it.
  1083. // Pretend we copied it.
  1084. state.copiedFromOrigin()
  1085. continue
  1086. }
  1087. buf = buf[:int(block.Size)]
  1088. found, err := weakHashFinder.Iterate(block.WeakHash, buf, func(offset int64) bool {
  1089. if _, err := scanner.VerifyBuffer(buf, block); err != nil {
  1090. return true
  1091. }
  1092. _, err = dstFd.WriteAt(buf, block.Offset)
  1093. if err != nil {
  1094. state.fail("dst write", err)
  1095. }
  1096. if offset == block.Offset {
  1097. state.copiedFromOrigin()
  1098. } else {
  1099. state.copiedFromOriginShifted()
  1100. }
  1101. return false
  1102. })
  1103. if err != nil {
  1104. l.Debugln("weak hasher iter", err)
  1105. }
  1106. if !found {
  1107. found = f.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool {
  1108. inFile, err := rootedJoinedPath(folderRoots[folder], file)
  1109. if err != nil {
  1110. return false
  1111. }
  1112. fd, err := os.Open(inFile)
  1113. if err != nil {
  1114. return false
  1115. }
  1116. _, err = fd.ReadAt(buf, protocol.BlockSize*int64(index))
  1117. fd.Close()
  1118. if err != nil {
  1119. return false
  1120. }
  1121. hash, err := scanner.VerifyBuffer(buf, block)
  1122. if err != nil {
  1123. if hash != nil {
  1124. l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash)
  1125. err = f.model.finder.Fix(folder, file, index, block.Hash, hash)
  1126. if err != nil {
  1127. l.Warnln("finder fix:", err)
  1128. }
  1129. } else {
  1130. l.Debugln("Finder failed to verify buffer", err)
  1131. }
  1132. return false
  1133. }
  1134. _, err = dstFd.WriteAt(buf, block.Offset)
  1135. if err != nil {
  1136. state.fail("dst write", err)
  1137. }
  1138. if file == state.file.Name {
  1139. state.copiedFromOrigin()
  1140. }
  1141. return true
  1142. })
  1143. }
  1144. if state.failed() != nil {
  1145. break
  1146. }
  1147. if !found {
  1148. state.pullStarted()
  1149. ps := pullBlockState{
  1150. sharedPullerState: state.sharedPullerState,
  1151. block: block,
  1152. }
  1153. pullChan <- ps
  1154. } else {
  1155. state.copyDone(block)
  1156. }
  1157. }
  1158. weakHashFinder.Close()
  1159. out <- state.sharedPullerState
  1160. }
  1161. }
  1162. func (f *sendReceiveFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
  1163. for state := range in {
  1164. if state.failed() != nil {
  1165. out <- state.sharedPullerState
  1166. continue
  1167. }
  1168. // Get an fd to the temporary file. Technically we don't need it until
  1169. // after fetching the block, but if we run into an error here there is
  1170. // no point in issuing the request to the network.
  1171. fd, err := state.tempFile()
  1172. if err != nil {
  1173. out <- state.sharedPullerState
  1174. continue
  1175. }
  1176. if f.allowSparse && state.reused == 0 && state.block.IsEmpty() {
  1177. // There is no need to request a block of all zeroes. Pretend we
  1178. // requested it and handled it correctly.
  1179. state.pullDone(state.block)
  1180. out <- state.sharedPullerState
  1181. continue
  1182. }
  1183. var lastError error
  1184. candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block)
  1185. for {
  1186. // Select the least busy device to pull the block from. If we found no
  1187. // feasible device at all, fail the block (and in the long run, the
  1188. // file).
  1189. selected, found := activity.leastBusy(candidates)
  1190. if !found {
  1191. if lastError != nil {
  1192. state.fail("pull", lastError)
  1193. } else {
  1194. state.fail("pull", errNoDevice)
  1195. }
  1196. break
  1197. }
  1198. candidates = removeAvailability(candidates, selected)
  1199. // Fetch the block, while marking the selected device as in use so that
  1200. // leastBusy can select another device when someone else asks.
  1201. activity.using(selected)
  1202. buf, lastError := f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary)
  1203. activity.done(selected)
  1204. if lastError != nil {
  1205. l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError)
  1206. continue
  1207. }
  1208. // Verify that the received block matches the desired hash, if not
  1209. // try pulling it from another device.
  1210. _, lastError = scanner.VerifyBuffer(buf, state.block)
  1211. if lastError != nil {
  1212. l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch")
  1213. continue
  1214. }
  1215. // Save the block data we got from the cluster
  1216. _, err = fd.WriteAt(buf, state.block.Offset)
  1217. if err != nil {
  1218. state.fail("save", err)
  1219. } else {
  1220. state.pullDone(state.block)
  1221. }
  1222. break
  1223. }
  1224. out <- state.sharedPullerState
  1225. }
  1226. }
  1227. func (f *sendReceiveFolder) performFinish(state *sharedPullerState) error {
  1228. // Set the correct permission bits on the new file
  1229. if !f.ignorePermissions(state.file) {
  1230. if err := os.Chmod(state.tempName, os.FileMode(state.file.Permissions&0777)); err != nil {
  1231. return err
  1232. }
  1233. }
  1234. if stat, err := f.mtimeFS.Lstat(state.realName); err == nil {
  1235. // There is an old file or directory already in place. We need to
  1236. // handle that.
  1237. switch {
  1238. case stat.IsDir() || stat.Mode()&os.ModeSymlink != 0:
  1239. // It's a directory or a symlink. These are not versioned or
  1240. // archived for conflicts, only removed (which of course fails for
  1241. // non-empty directories).
  1242. // TODO: This is the place where we want to remove temporary files
  1243. // and future hard ignores before attempting a directory delete.
  1244. // Should share code with f.deletDir().
  1245. if err = osutil.InWritableDir(os.Remove, state.realName); err != nil {
  1246. return err
  1247. }
  1248. case f.inConflict(state.version, state.file.Version):
  1249. // The new file has been changed in conflict with the existing one. We
  1250. // should file it away as a conflict instead of just removing or
  1251. // archiving. Also merge with the version vector we had, to indicate
  1252. // we have resolved the conflict.
  1253. state.file.Version = state.file.Version.Merge(state.version)
  1254. if err = osutil.InWritableDir(f.moveForConflict, state.realName); err != nil {
  1255. return err
  1256. }
  1257. case f.versioner != nil:
  1258. // If we should use versioning, let the versioner archive the old
  1259. // file before we replace it. Archiving a non-existent file is not
  1260. // an error.
  1261. if err = f.versioner.Archive(state.realName); err != nil {
  1262. return err
  1263. }
  1264. }
  1265. }
  1266. // Replace the original content with the new one. If it didn't work,
  1267. // leave the temp file in place for reuse.
  1268. if err := osutil.TryRename(state.tempName, state.realName); err != nil {
  1269. return err
  1270. }
  1271. // Set the correct timestamp on the new file
  1272. f.mtimeFS.Chtimes(state.realName, state.file.ModTime(), state.file.ModTime()) // never fails
  1273. // Record the updated file in the index
  1274. f.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile}
  1275. return nil
  1276. }
  1277. func (f *sendReceiveFolder) finisherRoutine(in <-chan *sharedPullerState) {
  1278. for state := range in {
  1279. if closed, err := state.finalClose(); closed {
  1280. l.Debugln(f, "closing", state.file.Name)
  1281. f.queue.Done(state.file.Name)
  1282. if err == nil {
  1283. err = f.performFinish(state)
  1284. }
  1285. if err != nil {
  1286. l.Infoln("Puller: final:", err)
  1287. f.newError(state.file.Name, err)
  1288. }
  1289. events.Default.Log(events.ItemFinished, map[string]interface{}{
  1290. "folder": f.folderID,
  1291. "item": state.file.Name,
  1292. "error": events.Error(err),
  1293. "type": "file",
  1294. "action": "update",
  1295. })
  1296. if f.model.progressEmitter != nil {
  1297. f.model.progressEmitter.Deregister(state)
  1298. }
  1299. }
  1300. }
  1301. }
  1302. // Moves the given filename to the front of the job queue
  1303. func (f *sendReceiveFolder) BringToFront(filename string) {
  1304. f.queue.BringToFront(filename)
  1305. }
  1306. func (f *sendReceiveFolder) Jobs() ([]string, []string) {
  1307. return f.queue.Jobs()
  1308. }
  1309. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1310. // larger than 1000 items, and no more delayed than 2 seconds.
  1311. func (f *sendReceiveFolder) dbUpdaterRoutine() {
  1312. const (
  1313. maxBatchSize = 1000
  1314. maxBatchTime = 2 * time.Second
  1315. )
  1316. batch := make([]dbUpdateJob, 0, maxBatchSize)
  1317. files := make([]protocol.FileInfo, 0, maxBatchSize)
  1318. tick := time.NewTicker(maxBatchTime)
  1319. defer tick.Stop()
  1320. var changedFiles []string
  1321. var changedDirs []string
  1322. if f.fsync {
  1323. changedFiles = make([]string, 0, maxBatchSize)
  1324. changedDirs = make([]string, 0, maxBatchSize)
  1325. }
  1326. syncFilesOnce := func(files []string, syncFn func(string) error) {
  1327. sort.Strings(files)
  1328. var lastFile string
  1329. for _, file := range files {
  1330. if lastFile == file {
  1331. continue
  1332. }
  1333. lastFile = file
  1334. if err := syncFn(file); err != nil {
  1335. l.Infof("fsync %q failed: %v", file, err)
  1336. }
  1337. }
  1338. }
  1339. handleBatch := func() {
  1340. found := false
  1341. var lastFile protocol.FileInfo
  1342. for _, job := range batch {
  1343. files = append(files, job.file)
  1344. if f.fsync {
  1345. // collect changed files and dirs
  1346. switch job.jobType {
  1347. case dbUpdateHandleFile, dbUpdateShortcutFile:
  1348. changedFiles = append(changedFiles, filepath.Join(f.dir, job.file.Name))
  1349. case dbUpdateHandleDir:
  1350. changedDirs = append(changedDirs, filepath.Join(f.dir, job.file.Name))
  1351. case dbUpdateHandleSymlink:
  1352. // fsyncing symlinks is only supported by MacOS, ignore
  1353. }
  1354. if job.jobType != dbUpdateShortcutFile {
  1355. changedDirs = append(changedDirs, filepath.Dir(filepath.Join(f.dir, job.file.Name)))
  1356. }
  1357. }
  1358. if job.file.IsInvalid() || (job.file.IsDirectory() && !job.file.IsSymlink()) {
  1359. continue
  1360. }
  1361. if job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) == 0 {
  1362. continue
  1363. }
  1364. found = true
  1365. lastFile = job.file
  1366. }
  1367. if f.fsync {
  1368. // sync files and dirs to disk
  1369. syncFilesOnce(changedFiles, osutil.SyncFile)
  1370. changedFiles = changedFiles[:0]
  1371. syncFilesOnce(changedDirs, osutil.SyncDir)
  1372. changedDirs = changedDirs[:0]
  1373. }
  1374. // All updates to file/folder objects that originated remotely
  1375. // (across the network) use this call to updateLocals
  1376. f.model.updateLocalsFromPulling(f.folderID, files)
  1377. if found {
  1378. f.model.receivedFile(f.folderID, lastFile)
  1379. }
  1380. batch = batch[:0]
  1381. files = files[:0]
  1382. }
  1383. loop:
  1384. for {
  1385. select {
  1386. case job, ok := <-f.dbUpdates:
  1387. if !ok {
  1388. break loop
  1389. }
  1390. job.file.Sequence = 0
  1391. batch = append(batch, job)
  1392. if len(batch) == maxBatchSize {
  1393. handleBatch()
  1394. }
  1395. case <-tick.C:
  1396. if len(batch) > 0 {
  1397. handleBatch()
  1398. }
  1399. }
  1400. }
  1401. if len(batch) > 0 {
  1402. handleBatch()
  1403. }
  1404. }
  1405. func (f *sendReceiveFolder) inConflict(current, replacement protocol.Vector) bool {
  1406. if current.Concurrent(replacement) {
  1407. // Obvious case
  1408. return true
  1409. }
  1410. if replacement.Counter(f.model.shortID) > current.Counter(f.model.shortID) {
  1411. // The replacement file contains a higher version for ourselves than
  1412. // what we have. This isn't supposed to be possible, since it's only
  1413. // we who can increment that counter. We take it as a sign that
  1414. // something is wrong (our index may have been corrupted or removed)
  1415. // and flag it as a conflict.
  1416. return true
  1417. }
  1418. return false
  1419. }
  1420. func removeAvailability(availabilities []Availability, availability Availability) []Availability {
  1421. for i := range availabilities {
  1422. if availabilities[i] == availability {
  1423. availabilities[i] = availabilities[len(availabilities)-1]
  1424. return availabilities[:len(availabilities)-1]
  1425. }
  1426. }
  1427. return availabilities
  1428. }
  1429. func (f *sendReceiveFolder) moveForConflict(name string) error {
  1430. if strings.Contains(filepath.Base(name), ".sync-conflict-") {
  1431. l.Infoln("Conflict for", name, "which is already a conflict copy; not copying again.")
  1432. if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
  1433. return err
  1434. }
  1435. return nil
  1436. }
  1437. if f.maxConflicts == 0 {
  1438. if err := os.Remove(name); err != nil && !os.IsNotExist(err) {
  1439. return err
  1440. }
  1441. return nil
  1442. }
  1443. ext := filepath.Ext(name)
  1444. withoutExt := name[:len(name)-len(ext)]
  1445. newName := withoutExt + time.Now().Format(".sync-conflict-20060102-150405") + ext
  1446. err := os.Rename(name, newName)
  1447. if os.IsNotExist(err) {
  1448. // We were supposed to move a file away but it does not exist. Either
  1449. // the user has already moved it away, or the conflict was between a
  1450. // remote modification and a local delete. In either way it does not
  1451. // matter, go ahead as if the move succeeded.
  1452. err = nil
  1453. }
  1454. if f.maxConflicts > -1 {
  1455. matches, gerr := osutil.Glob(withoutExt + ".sync-conflict-????????-??????" + ext)
  1456. if gerr == nil && len(matches) > f.maxConflicts {
  1457. sort.Sort(sort.Reverse(sort.StringSlice(matches)))
  1458. for _, match := range matches[f.maxConflicts:] {
  1459. gerr = os.Remove(match)
  1460. if gerr != nil {
  1461. l.Debugln(f, "removing extra conflict", gerr)
  1462. }
  1463. }
  1464. } else if gerr != nil {
  1465. l.Debugln(f, "globbing for conflicts", gerr)
  1466. }
  1467. }
  1468. return err
  1469. }
  1470. func (f *sendReceiveFolder) newError(path string, err error) {
  1471. f.errorsMut.Lock()
  1472. defer f.errorsMut.Unlock()
  1473. // We might get more than one error report for a file (i.e. error on
  1474. // Write() followed by Close()); we keep the first error as that is
  1475. // probably closer to the root cause.
  1476. if _, ok := f.errors[path]; ok {
  1477. return
  1478. }
  1479. f.errors[path] = err.Error()
  1480. }
  1481. func (f *sendReceiveFolder) clearErrors() {
  1482. f.errorsMut.Lock()
  1483. f.errors = make(map[string]string)
  1484. f.errorsMut.Unlock()
  1485. }
  1486. func (f *sendReceiveFolder) currentErrors() []fileError {
  1487. f.errorsMut.Lock()
  1488. errors := make([]fileError, 0, len(f.errors))
  1489. for path, err := range f.errors {
  1490. errors = append(errors, fileError{path, err})
  1491. }
  1492. sort.Sort(fileErrorList(errors))
  1493. f.errorsMut.Unlock()
  1494. return errors
  1495. }
  1496. // A []fileError is sent as part of an event and will be JSON serialized.
  1497. type fileError struct {
  1498. Path string `json:"path"`
  1499. Err string `json:"error"`
  1500. }
  1501. type fileErrorList []fileError
  1502. func (l fileErrorList) Len() int {
  1503. return len(l)
  1504. }
  1505. func (l fileErrorList) Less(a, b int) bool {
  1506. return l[a].Path < l[b].Path
  1507. }
  1508. func (l fileErrorList) Swap(a, b int) {
  1509. l[a], l[b] = l[b], l[a]
  1510. }
  1511. // fileValid returns nil when the file is valid for processing, or an error if it's not
  1512. func fileValid(file db.FileIntf) error {
  1513. switch {
  1514. case file.IsDeleted():
  1515. // We don't care about file validity if we're not supposed to have it
  1516. return nil
  1517. case !symlinks.Supported && file.IsSymlink():
  1518. return errUnsupportedSymlink
  1519. case runtime.GOOS == "windows" && windowsInvalidFilename(file.FileName()):
  1520. return errInvalidFilename
  1521. }
  1522. return nil
  1523. }
  1524. var windowsDisallowedCharacters = string([]rune{
  1525. '<', '>', ':', '"', '|', '?', '*',
  1526. 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
  1527. 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
  1528. 21, 22, 23, 24, 25, 26, 27, 28, 29, 30,
  1529. 31,
  1530. })
  1531. func windowsInvalidFilename(name string) bool {
  1532. // None of the path components should end in space
  1533. for _, part := range strings.Split(name, `\`) {
  1534. if len(part) == 0 {
  1535. continue
  1536. }
  1537. if part[len(part)-1] == ' ' {
  1538. // Names ending in space are not valid.
  1539. return true
  1540. }
  1541. }
  1542. // The path must not contain any disallowed characters
  1543. return strings.ContainsAny(name, windowsDisallowedCharacters)
  1544. }
  1545. // byComponentCount sorts by the number of path components in Name, that is
  1546. // "x/y" sorts before "foo/bar/baz".
  1547. type byComponentCount []protocol.FileInfo
  1548. func (l byComponentCount) Len() int {
  1549. return len(l)
  1550. }
  1551. func (l byComponentCount) Less(a, b int) bool {
  1552. return componentCount(l[a].Name) < componentCount(l[b].Name)
  1553. }
  1554. func (l byComponentCount) Swap(a, b int) {
  1555. l[a], l[b] = l[b], l[a]
  1556. }
  1557. func componentCount(name string) int {
  1558. count := 0
  1559. for _, codepoint := range name {
  1560. if codepoint == os.PathSeparator {
  1561. count++
  1562. }
  1563. }
  1564. return count
  1565. }