folder_sendrecv.go 70 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "bytes"
  9. "context"
  10. "crypto/sha256"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "log/slog"
  15. "path/filepath"
  16. "slices"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "github.com/syncthing/syncthing/internal/itererr"
  22. "github.com/syncthing/syncthing/internal/slogutil"
  23. "github.com/syncthing/syncthing/lib/build"
  24. "github.com/syncthing/syncthing/lib/config"
  25. "github.com/syncthing/syncthing/lib/events"
  26. "github.com/syncthing/syncthing/lib/fs"
  27. "github.com/syncthing/syncthing/lib/ignore"
  28. "github.com/syncthing/syncthing/lib/osutil"
  29. "github.com/syncthing/syncthing/lib/protocol"
  30. "github.com/syncthing/syncthing/lib/scanner"
  31. "github.com/syncthing/syncthing/lib/semaphore"
  32. "github.com/syncthing/syncthing/lib/versioner"
  33. )
  34. var (
  35. blockStats = make(map[string]int)
  36. blockStatsMut sync.Mutex
  37. )
  38. func init() {
  39. folderFactories[config.FolderTypeSendReceive] = newSendReceiveFolder
  40. }
  41. // A pullBlockState is passed to the puller routine for each block that needs
  42. // to be fetched.
  43. type pullBlockState struct {
  44. *sharedPullerState
  45. block protocol.BlockInfo
  46. }
  47. // A copyBlocksState is passed to copy routine if the file has blocks to be
  48. // copied.
  49. type copyBlocksState struct {
  50. *sharedPullerState
  51. blocks []protocol.BlockInfo
  52. have int
  53. }
  54. // Which filemode bits to preserve
  55. const retainBits = fs.ModeSetgid | fs.ModeSetuid | fs.ModeSticky
  56. var (
  57. activity = newDeviceActivity()
  58. errNoDevice = errors.New("peers who had this file went away, or the file has changed while syncing. will retry later")
  59. errDirPrefix = "directory has been deleted on a remote device but "
  60. errDirHasToBeScanned = errors.New(errDirPrefix + "contains changed files, scheduling scan")
  61. errDirHasIgnored = errors.New(errDirPrefix + "contains ignored files (see ignore documentation for (?d) prefix)")
  62. errDirNotEmpty = errors.New(errDirPrefix + "is not empty; the contents are probably ignored on that remote device, but not locally")
  63. errNotAvailable = errors.New("no connected device has the required version of this file")
  64. errModified = errors.New("file modified but not rescanned; will try again later")
  65. errUnexpectedDirOnFileDel = errors.New("encountered directory when trying to remove file/symlink")
  66. errIncompatibleSymlink = errors.New("incompatible symlink entry; rescan with newer Syncthing on source")
  67. contextRemovingOldItem = "removing item to be replaced"
  68. )
  69. type dbUpdateType int
  70. func (d dbUpdateType) String() string {
  71. switch d {
  72. case dbUpdateHandleDir:
  73. return "dbUpdateHandleDir"
  74. case dbUpdateDeleteDir:
  75. return "dbUpdateDeleteDir"
  76. case dbUpdateHandleFile:
  77. return "dbUpdateHandleFile"
  78. case dbUpdateDeleteFile:
  79. return "dbUpdateDeleteFile"
  80. case dbUpdateShortcutFile:
  81. return "dbUpdateShortcutFile"
  82. case dbUpdateHandleSymlink:
  83. return "dbUpdateHandleSymlink"
  84. case dbUpdateInvalidate:
  85. return "dbUpdateHandleInvalidate"
  86. }
  87. panic(fmt.Sprintf("unknown dbUpdateType %d", d))
  88. }
  89. const (
  90. dbUpdateHandleDir dbUpdateType = iota
  91. dbUpdateDeleteDir
  92. dbUpdateHandleFile
  93. dbUpdateDeleteFile
  94. dbUpdateShortcutFile
  95. dbUpdateHandleSymlink
  96. dbUpdateInvalidate
  97. )
  98. const (
  99. defaultCopiers = 2
  100. defaultPullerPause = 60 * time.Second
  101. defaultPullerPendingKiB = 2 * protocol.MaxBlockSize / 1024
  102. maxPullerIterations = 3
  103. )
  104. type dbUpdateJob struct {
  105. file protocol.FileInfo
  106. jobType dbUpdateType
  107. }
  108. type sendReceiveFolder struct {
  109. *folder
  110. queue *jobQueue
  111. blockPullReorderer blockPullReorderer
  112. writeLimiter *semaphore.Semaphore
  113. tempPullErrors map[string]string // pull errors that might be just transient
  114. }
  115. func newSendReceiveFolder(model *model, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, evLogger events.Logger, ioLimiter *semaphore.Semaphore) service {
  116. f := &sendReceiveFolder{
  117. folder: newFolder(model, ignores, cfg, evLogger, ioLimiter, ver),
  118. queue: newJobQueue(),
  119. blockPullReorderer: newBlockPullReorderer(cfg.BlockPullOrder, model.id, cfg.DeviceIDs()),
  120. writeLimiter: semaphore.New(cfg.MaxConcurrentWrites),
  121. }
  122. f.puller = f
  123. if f.Copiers == 0 {
  124. f.Copiers = defaultCopiers
  125. }
  126. // If the configured max amount of pending data is zero, we use the
  127. // default. If it's configured to something non-zero but less than the
  128. // protocol block size we adjust it upwards accordingly.
  129. if f.PullerMaxPendingKiB == 0 {
  130. f.PullerMaxPendingKiB = defaultPullerPendingKiB
  131. }
  132. if blockSizeKiB := protocol.MaxBlockSize / 1024; f.PullerMaxPendingKiB < blockSizeKiB {
  133. f.PullerMaxPendingKiB = blockSizeKiB
  134. }
  135. return f
  136. }
  137. // pull returns true if it manages to get all needed items from peers, i.e. get
  138. // the device in sync with the global state.
  139. func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) {
  140. f.sl.DebugContext(ctx, "Pulling")
  141. scanChan := make(chan string)
  142. go f.pullScannerRoutine(ctx, scanChan)
  143. defer func() {
  144. close(scanChan)
  145. f.setState(FolderIdle)
  146. }()
  147. metricFolderPulls.WithLabelValues(f.ID).Inc()
  148. pullCtx, cancel := context.WithCancel(ctx)
  149. defer cancel()
  150. go addTimeUntilCancelled(pullCtx, metricFolderPullSeconds.WithLabelValues(f.ID))
  151. changed := 0
  152. f.errorsMut.Lock()
  153. f.pullErrors = nil
  154. f.errorsMut.Unlock()
  155. var err error
  156. for tries := range maxPullerIterations {
  157. select {
  158. case <-ctx.Done():
  159. return false, ctx.Err()
  160. default:
  161. }
  162. // Needs to be set on every loop, as the puller might have set
  163. // it to FolderSyncing during the last iteration.
  164. f.setState(FolderSyncPreparing)
  165. changed, err = f.pullerIteration(ctx, scanChan)
  166. if err != nil {
  167. return false, err
  168. }
  169. f.sl.DebugContext(ctx, "Pull iteration completed", "changed", changed, "try", tries+1)
  170. if changed == 0 {
  171. // No files were changed by the puller, so we are in sync, or we
  172. // are unable to make further progress for the moment.
  173. break
  174. }
  175. }
  176. f.errorsMut.Lock()
  177. pullErrNum := len(f.tempPullErrors)
  178. if pullErrNum > 0 {
  179. f.pullErrors = make([]FileError, 0, len(f.tempPullErrors))
  180. for path, err := range f.tempPullErrors {
  181. f.sl.WarnContext(ctx, "Failed to sync", slogutil.FilePath(path), slogutil.Error(err))
  182. f.pullErrors = append(f.pullErrors, FileError{
  183. Err: err,
  184. Path: path,
  185. })
  186. }
  187. f.tempPullErrors = nil
  188. }
  189. f.errorsMut.Unlock()
  190. if pullErrNum > 0 {
  191. f.evLogger.Log(events.FolderErrors, map[string]interface{}{
  192. "folder": f.folderID,
  193. "errors": f.Errors(),
  194. })
  195. }
  196. // We're done if we didn't change anything and didn't fail to change
  197. // anything
  198. return changed == 0 && pullErrNum == 0, nil
  199. }
  200. // pullerIteration runs a single puller iteration for the given folder and
  201. // returns the number items that should have been synced (even those that
  202. // might have failed). One puller iteration handles all files currently
  203. // flagged as needed in the folder.
  204. func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<- string) (int, error) {
  205. f.errorsMut.Lock()
  206. f.tempPullErrors = make(map[string]string)
  207. f.errorsMut.Unlock()
  208. pullChan := make(chan pullBlockState)
  209. copyChan := make(chan copyBlocksState)
  210. finisherChan := make(chan *sharedPullerState)
  211. dbUpdateChan := make(chan dbUpdateJob)
  212. var pullWg sync.WaitGroup
  213. var copyWg sync.WaitGroup
  214. var doneWg sync.WaitGroup
  215. var updateWg sync.WaitGroup
  216. f.sl.DebugContext(ctx, "Starting puller iteration", "copiers", f.Copiers, "pullerPendingKiB", f.PullerMaxPendingKiB)
  217. var changed int // only read after updateWg closes
  218. updateWg.Go(func() {
  219. // dbUpdaterRoutine finishes when dbUpdateChan is closed
  220. changed = f.dbUpdaterRoutine(dbUpdateChan)
  221. })
  222. for range f.Copiers {
  223. copyWg.Go(func() {
  224. // copierRoutine finishes when copyChan is closed
  225. f.copierRoutine(ctx, copyChan, pullChan, finisherChan)
  226. })
  227. }
  228. pullWg.Go(func() {
  229. // pullerRoutine finishes when pullChan is closed
  230. f.pullerRoutine(ctx, pullChan, finisherChan)
  231. })
  232. // finisherRoutine finishes when finisherChan is closed
  233. doneWg.Go(func() {
  234. f.finisherRoutine(ctx, finisherChan, dbUpdateChan, scanChan)
  235. })
  236. fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan)
  237. // Signal copy and puller routines that we are done with the in data for
  238. // this iteration. Wait for them to finish.
  239. close(copyChan)
  240. copyWg.Wait()
  241. close(pullChan)
  242. pullWg.Wait()
  243. // Signal the finisher chan that there will be no more input and wait
  244. // for it to finish.
  245. close(finisherChan)
  246. doneWg.Wait()
  247. if err == nil {
  248. f.processDeletions(ctx, fileDeletions, dirDeletions, dbUpdateChan, scanChan)
  249. }
  250. // Wait for db updates and scan scheduling to complete
  251. close(dbUpdateChan)
  252. updateWg.Wait()
  253. f.queue.Reset()
  254. return changed, err
  255. }
  256. func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (map[string]protocol.FileInfo, []protocol.FileInfo, error) {
  257. var dirDeletions []protocol.FileInfo
  258. fileDeletions := map[string]protocol.FileInfo{}
  259. buckets := map[string][]protocol.FileInfo{}
  260. // Iterate the list of items that we need and sort them into piles.
  261. // Regular files to pull goes into the file queue, everything else
  262. // (directories, symlinks and deletes) goes into the "process directly"
  263. // pile.
  264. loop:
  265. for file, err := range itererr.Zip(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) {
  266. if err != nil {
  267. return nil, nil, err
  268. }
  269. select {
  270. case <-ctx.Done():
  271. break loop
  272. default:
  273. }
  274. if f.IgnoreDelete && file.IsDeleted() {
  275. f.sl.DebugContext(ctx, "Ignoring file deletion per config", slogutil.FilePath(file.FileName()))
  276. continue
  277. }
  278. switch {
  279. case f.ignores.Match(file.Name).IsIgnored():
  280. file.SetIgnored()
  281. f.sl.DebugContext(ctx, "Handling ignored file", file.LogAttr())
  282. dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
  283. case build.IsWindows && fs.WindowsInvalidFilename(file.Name) != nil:
  284. if file.IsDeleted() {
  285. // Just pretend we deleted it, no reason to create an error
  286. // about a deleted file that we can't have anyway.
  287. // Reason we need it in the first place is, that it was
  288. // ignored at some point.
  289. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  290. } else {
  291. // We can't pull an invalid file. Grab the error again since
  292. // we couldn't assign it directly in the case clause.
  293. f.newPullError(file.Name, fs.WindowsInvalidFilename(file.Name))
  294. }
  295. case file.IsDeleted():
  296. switch {
  297. case file.IsDirectory():
  298. // Perform directory deletions at the end, as we may have
  299. // files to delete inside them before we get to that point.
  300. dirDeletions = append(dirDeletions, file)
  301. case file.IsSymlink():
  302. f.deleteFile(file, dbUpdateChan, scanChan)
  303. default:
  304. df, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  305. if err != nil {
  306. return nil, nil, err
  307. }
  308. // Local file can be already deleted, but with a lower version
  309. // number, hence the deletion coming in again as part of
  310. // WithNeed, furthermore, the file can simply be of the wrong
  311. // type if we haven't yet managed to pull it.
  312. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() && !df.IsInvalid() {
  313. fileDeletions[file.Name] = file
  314. // Put files into buckets per first hash
  315. key := string(df.BlocksHash)
  316. buckets[key] = append(buckets[key], df)
  317. } else {
  318. f.deleteFileWithCurrent(file, df, ok, dbUpdateChan, scanChan)
  319. }
  320. }
  321. case file.Type == protocol.FileInfoTypeFile:
  322. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  323. if err != nil {
  324. return nil, nil, err
  325. }
  326. if hasCurFile && file.BlocksEqual(curFile) {
  327. // We are supposed to copy the entire file, and then fetch nothing. We
  328. // are only updating metadata, so we don't actually *need* to make the
  329. // copy.
  330. f.shortcutFile(file, dbUpdateChan)
  331. } else {
  332. // Queue files for processing after directories and symlinks.
  333. f.queue.Push(file.Name, file.Size, file.ModTime())
  334. }
  335. case (build.IsWindows || build.IsAndroid) && file.IsSymlink():
  336. if err := f.handleSymlinkCheckExisting(file, scanChan); err != nil {
  337. f.newPullError(file.Name, fmt.Errorf("handling unsupported symlink: %w", err))
  338. break
  339. }
  340. file.SetUnsupported()
  341. f.sl.DebugContext(ctx, "Invalidating unsupported symlink", slogutil.FilePath(file.Name))
  342. dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
  343. case file.IsDirectory() && !file.IsSymlink():
  344. f.sl.DebugContext(ctx, "Handling directory", slogutil.FilePath(file.Name))
  345. if f.checkParent(file.Name, scanChan) {
  346. f.handleDir(file, dbUpdateChan, scanChan)
  347. }
  348. case file.IsSymlink():
  349. f.sl.DebugContext(ctx, "Handling symlink", slogutil.FilePath(file.Name))
  350. if f.checkParent(file.Name, scanChan) {
  351. f.handleSymlink(file, dbUpdateChan, scanChan)
  352. }
  353. default:
  354. panic("unhandleable item type, can't happen")
  355. }
  356. }
  357. select {
  358. case <-ctx.Done():
  359. return nil, nil, ctx.Err()
  360. default:
  361. }
  362. // Process the file queue.
  363. nextFile:
  364. for {
  365. select {
  366. case <-ctx.Done():
  367. return fileDeletions, dirDeletions, ctx.Err()
  368. default:
  369. }
  370. fileName, ok := f.queue.Pop()
  371. if !ok {
  372. break
  373. }
  374. fi, ok, err := f.model.sdb.GetGlobalFile(f.folderID, fileName)
  375. if err != nil {
  376. return nil, nil, err
  377. }
  378. if !ok {
  379. // File is no longer in the index. Mark it as done and drop it.
  380. f.queue.Done(fileName)
  381. continue
  382. }
  383. if fi.IsDeleted() || fi.IsInvalid() || fi.Type != protocol.FileInfoTypeFile {
  384. // The item has changed type or status in the index while we
  385. // were processing directories above.
  386. f.queue.Done(fileName)
  387. continue
  388. }
  389. if !f.checkParent(fi.Name, scanChan) {
  390. f.queue.Done(fileName)
  391. continue
  392. }
  393. // Check our list of files to be removed for a match, in which case
  394. // we can just do a rename instead.
  395. key := string(fi.BlocksHash)
  396. for candidate, ok := popCandidate(buckets, key); ok; candidate, ok = popCandidate(buckets, key) {
  397. // candidate is our current state of the file, where as the
  398. // desired state with the delete bit set is in the deletion
  399. // map.
  400. desired := fileDeletions[candidate.Name]
  401. if err := f.renameFile(candidate, desired, fi, dbUpdateChan, scanChan); err != nil {
  402. f.sl.DebugContext(ctx, "Rename shortcut failed", slogutil.FilePath(fi.Name), slogutil.Error(err))
  403. // Failed to rename, try next one.
  404. continue
  405. }
  406. // Remove the pending deletion (as we performed it by renaming)
  407. delete(fileDeletions, candidate.Name)
  408. f.queue.Done(fileName)
  409. continue nextFile
  410. }
  411. // Verify there is some availability for the file before we start
  412. // processing it
  413. devices := f.model.fileAvailability(f.FolderConfiguration, fi)
  414. if len(devices) == 0 {
  415. f.newPullError(fileName, errNotAvailable)
  416. f.queue.Done(fileName)
  417. continue
  418. }
  419. // Verify we have space to handle the file before we start
  420. // creating temp files etc.
  421. if err := f.CheckAvailableSpace(uint64(fi.Size)); err != nil { //nolint:gosec
  422. f.newPullError(fileName, err)
  423. f.queue.Done(fileName)
  424. continue
  425. }
  426. if err := f.handleFile(ctx, fi, copyChan); err != nil {
  427. f.newPullError(fileName, err)
  428. }
  429. }
  430. return fileDeletions, dirDeletions, nil
  431. }
  432. func popCandidate(buckets map[string][]protocol.FileInfo, key string) (protocol.FileInfo, bool) {
  433. cands := buckets[key]
  434. if len(cands) == 0 {
  435. return protocol.FileInfo{}, false
  436. }
  437. buckets[key] = cands[1:]
  438. return cands[0], true
  439. }
  440. func (f *sendReceiveFolder) processDeletions(ctx context.Context, fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  441. for _, file := range fileDeletions {
  442. select {
  443. case <-ctx.Done():
  444. return
  445. default:
  446. }
  447. f.deleteFile(file, dbUpdateChan, scanChan)
  448. }
  449. // Process in reverse order to delete depth first
  450. for i := range dirDeletions {
  451. select {
  452. case <-ctx.Done():
  453. return
  454. default:
  455. }
  456. dir := dirDeletions[len(dirDeletions)-i-1]
  457. f.sl.DebugContext(ctx, "Deleting directory", slogutil.FilePath(dir.Name))
  458. f.deleteDir(dir, dbUpdateChan, scanChan)
  459. }
  460. }
  461. // handleDir creates or updates the given directory
  462. func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  463. // Used in the defer closure below, updated by the function body. Take
  464. // care not declare another err.
  465. var err error
  466. f.evLogger.Log(events.ItemStarted, map[string]string{
  467. "folder": f.folderID,
  468. "item": file.Name,
  469. "type": "dir",
  470. "action": "update",
  471. })
  472. defer func() {
  473. slog.Info("Created or updated directory", f.LogAttr(), file.LogAttr())
  474. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  475. "folder": f.folderID,
  476. "item": file.Name,
  477. "error": events.Error(err),
  478. "type": "dir",
  479. "action": "update",
  480. })
  481. }()
  482. mode := fs.FileMode(file.Permissions & 0o777)
  483. if f.IgnorePerms || file.NoPermissions {
  484. mode = 0o777
  485. }
  486. f.sl.Debug("Need dir", "file", file, "cur", slogutil.Expensive(func() any {
  487. curFile, _, _ := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  488. return curFile
  489. }))
  490. info, err := f.mtimefs.Lstat(file.Name)
  491. switch {
  492. // There is already something under that name, we need to handle that.
  493. // Unless it already is a directory, as we only track permissions,
  494. // that don't result in a conflict.
  495. case err == nil && !info.IsDir():
  496. // Check that it is what we have in the database.
  497. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  498. if err != nil {
  499. f.newPullError(file.Name, fmt.Errorf("handling dir: %w", err))
  500. return
  501. }
  502. if err := f.scanIfItemChanged(file.Name, info, curFile, hasCurFile, false, scanChan); err != nil {
  503. f.newPullError(file.Name, fmt.Errorf("handling dir: %w", err))
  504. return
  505. }
  506. // Remove it to replace with the dir.
  507. if !curFile.IsSymlink() && file.InConflictWith(curFile) {
  508. // The new file has been changed in conflict with the existing one. We
  509. // should file it away as a conflict instead of just removing or
  510. // archiving.
  511. // Symlinks aren't checked for conflicts.
  512. err = f.inWritableDir(func(name string) error {
  513. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  514. }, curFile.Name)
  515. } else {
  516. err = f.deleteItemOnDisk(curFile, scanChan)
  517. }
  518. if err != nil {
  519. f.newPullError(file.Name, err)
  520. return
  521. }
  522. fallthrough
  523. // The directory doesn't exist, so we create it with the right
  524. // mode bits from the start.
  525. case err != nil && fs.IsNotExist(err):
  526. // We declare a function that acts on only the path name, so
  527. // we can pass it to InWritableDir. We use a regular Mkdir and
  528. // not MkdirAll because the parent should already exist.
  529. mkdir := func(path string) error {
  530. err = f.mtimefs.Mkdir(path, mode)
  531. if err != nil {
  532. return err
  533. }
  534. // Set the platform data (ownership, xattrs, etc).
  535. if err := f.setPlatformData(&file, path); err != nil {
  536. return err
  537. }
  538. if f.IgnorePerms || file.NoPermissions {
  539. return nil
  540. }
  541. // Stat the directory so we can check its permissions.
  542. info, err := f.mtimefs.Lstat(path)
  543. if err != nil {
  544. return err
  545. }
  546. // Mask for the bits we want to preserve and add them in to the
  547. // directories permissions.
  548. return f.mtimefs.Chmod(path, mode|(info.Mode()&retainBits))
  549. }
  550. if err = f.inWritableDir(mkdir, file.Name); err == nil {
  551. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
  552. } else {
  553. f.newPullError(file.Name, fmt.Errorf("creating directory: %w", err))
  554. }
  555. return
  556. // Weird error when stat()'ing the dir. Probably won't work to do
  557. // anything else with it if we can't even stat() it.
  558. case err != nil:
  559. f.newPullError(file.Name, fmt.Errorf("checking file to be replaced: %w", err))
  560. return
  561. }
  562. // The directory already exists, so we just correct the metadata. (We
  563. // don't handle modification times on directories, because that sucks...)
  564. // It's OK to change mode bits on stuff within non-writable directories.
  565. if !f.IgnorePerms && !file.NoPermissions {
  566. if err := f.mtimefs.Chmod(file.Name, mode|(info.Mode()&retainBits)); err != nil {
  567. f.newPullError(file.Name, fmt.Errorf("handling dir (setting permissions): %w", err))
  568. return
  569. }
  570. if err := f.setPlatformData(&file, file.Name); err != nil {
  571. f.newPullError(file.Name, fmt.Errorf("handling dir (setting metadata): %w", err))
  572. return
  573. }
  574. }
  575. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
  576. }
  577. // checkParent verifies that the thing we are handling lives inside a directory,
  578. // and not a symlink or regular file. It also resurrects missing parent dirs.
  579. func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) bool {
  580. parent := filepath.Dir(file)
  581. if err := osutil.TraversesSymlink(f.mtimefs, parent); err != nil {
  582. f.newPullError(file, fmt.Errorf("checking parent dirs: %w", err))
  583. return false
  584. }
  585. // issues #114 and #4475: This works around a race condition
  586. // between two devices, when one device removes a directory and the
  587. // other creates a file in it. However that happens, we end up with
  588. // a directory for "foo" with the delete bit, but a file "foo/bar"
  589. // that we want to sync. We never create the directory, and hence
  590. // fail to create the file and end up looping forever on it. This
  591. // breaks that by creating the directory and scheduling a scan,
  592. // where it will be found and the delete bit on it removed. The
  593. // user can then clean up as they like...
  594. // This can also occur if an entire tree structure was deleted, but only
  595. // a leave has been scanned.
  596. //
  597. // And if this is an encrypted folder:
  598. // Encrypted files have made-up filenames with two synthetic parent
  599. // directories which don't have any meaning. Create those if necessary.
  600. if _, err := f.mtimefs.Lstat(parent); !fs.IsNotExist(err) {
  601. f.sl.Debug("Parent directory exists", slogutil.FilePath(file))
  602. return true
  603. }
  604. f.sl.Debug("Creating parent directory", slogutil.FilePath(file))
  605. if err := f.mtimefs.MkdirAll(parent, 0o755); err != nil {
  606. f.newPullError(file, fmt.Errorf("creating parent dir: %w", err))
  607. return false
  608. }
  609. if f.Type != config.FolderTypeReceiveEncrypted {
  610. scanChan <- parent
  611. }
  612. return true
  613. }
  614. // handleSymlink creates or updates the given symlink
  615. func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  616. // Used in the defer closure below, updated by the function body. Take
  617. // care not declare another err.
  618. var err error
  619. f.evLogger.Log(events.ItemStarted, map[string]string{
  620. "folder": f.folderID,
  621. "item": file.Name,
  622. "type": "symlink",
  623. "action": "update",
  624. })
  625. defer func() {
  626. if err != nil {
  627. slog.Warn("Failed to handle symlink", f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  628. } else {
  629. slog.Info("Created or updated symlink", f.LogAttr(), file.LogAttr())
  630. }
  631. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  632. "folder": f.folderID,
  633. "item": file.Name,
  634. "error": events.Error(err),
  635. "type": "symlink",
  636. "action": "update",
  637. })
  638. }()
  639. f.sl.Debug("Need symlink", slogutil.FilePath(file.Name), slog.Any("cur", slogutil.Expensive(func() any {
  640. curFile, _, _ := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  641. return curFile
  642. })))
  643. if len(file.SymlinkTarget) == 0 {
  644. // Index entry from a Syncthing predating the support for including
  645. // the link target in the index entry. We log this as an error.
  646. f.newPullError(file.Name, errIncompatibleSymlink)
  647. return
  648. }
  649. if err = f.handleSymlinkCheckExisting(file, scanChan); err != nil {
  650. f.newPullError(file.Name, fmt.Errorf("handling symlink: %w", err))
  651. return
  652. }
  653. // We declare a function that acts on only the path name, so
  654. // we can pass it to InWritableDir.
  655. createLink := func(path string) error {
  656. if err := f.mtimefs.CreateSymlink(string(file.SymlinkTarget), path); err != nil {
  657. return err
  658. }
  659. return f.setPlatformData(&file, path)
  660. }
  661. if err = f.inWritableDir(createLink, file.Name); err == nil {
  662. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleSymlink}
  663. } else {
  664. f.newPullError(file.Name, fmt.Errorf("symlink create: %w", err))
  665. }
  666. }
  667. func (f *sendReceiveFolder) handleSymlinkCheckExisting(file protocol.FileInfo, scanChan chan<- string) error {
  668. // If there is already something under that name, we need to handle that.
  669. info, err := f.mtimefs.Lstat(file.Name)
  670. if err != nil {
  671. if fs.IsNotExist(err) {
  672. return nil
  673. }
  674. return err
  675. }
  676. // Check that it is what we have in the database.
  677. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  678. if err != nil {
  679. return err
  680. }
  681. if err := f.scanIfItemChanged(file.Name, info, curFile, hasCurFile, false, scanChan); err != nil {
  682. return err
  683. }
  684. // Remove it to replace with the symlink. This also handles the
  685. // "change symlink type" path.
  686. if !curFile.IsDirectory() && !curFile.IsSymlink() && file.InConflictWith(curFile) {
  687. // The new file has been changed in conflict with the existing one. We
  688. // should file it away as a conflict instead of just removing or
  689. // archiving.
  690. // Directories and symlinks aren't checked for conflicts.
  691. return f.inWritableDir(func(name string) error {
  692. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  693. }, curFile.Name)
  694. } else {
  695. return f.deleteItemOnDisk(curFile, scanChan)
  696. }
  697. }
  698. // deleteDir attempts to remove a directory that was deleted on a remote
  699. func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  700. // Used in the defer closure below, updated by the function body. Take
  701. // care not declare another err.
  702. var err error
  703. f.evLogger.Log(events.ItemStarted, map[string]string{
  704. "folder": f.folderID,
  705. "item": file.Name,
  706. "type": "dir",
  707. "action": "delete",
  708. })
  709. defer func() {
  710. if err != nil {
  711. f.newPullError(file.Name, fmt.Errorf("delete dir: %w", err))
  712. slog.Info("Failed to delete directory", f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  713. } else {
  714. slog.Info("Deleted directory", f.LogAttr(), file.LogAttr())
  715. }
  716. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  717. "folder": f.folderID,
  718. "item": file.Name,
  719. "error": events.Error(err),
  720. "type": "dir",
  721. "action": "delete",
  722. })
  723. }()
  724. cur, hasCur, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  725. if err != nil {
  726. return
  727. }
  728. if err = f.checkToBeDeleted(file, cur, hasCur, scanChan); err != nil {
  729. if fs.IsNotExist(err) || fs.IsErrCaseConflict(err) {
  730. err = nil
  731. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteDir}
  732. }
  733. return
  734. }
  735. if err = f.deleteDirOnDisk(file.Name, scanChan); err != nil {
  736. return
  737. }
  738. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteDir}
  739. }
  740. // deleteFile attempts to delete the given file
  741. func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  742. cur, hasCur, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  743. if err != nil {
  744. f.newPullError(file.Name, fmt.Errorf("delete file: %w", err))
  745. return
  746. }
  747. f.deleteFileWithCurrent(file, cur, hasCur, dbUpdateChan, scanChan)
  748. }
  749. func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, hasCur bool, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  750. // Used in the defer closure below, updated by the function body. Take
  751. // care not declare another err.
  752. var err error
  753. f.sl.Debug("Deleting file or symlink", slogutil.FilePath(file.Name))
  754. f.evLogger.Log(events.ItemStarted, map[string]string{
  755. "folder": f.folderID,
  756. "item": file.Name,
  757. "type": "file",
  758. "action": "delete",
  759. })
  760. defer func() {
  761. kind := "file"
  762. if file.IsSymlink() {
  763. kind = "symlink"
  764. }
  765. if err != nil {
  766. f.newPullError(file.Name, fmt.Errorf("delete file: %w", err))
  767. slog.Info("Failed to delete "+kind, f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  768. } else {
  769. slog.Info("Deleted "+kind, f.LogAttr(), file.LogAttr())
  770. }
  771. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  772. "folder": f.folderID,
  773. "item": file.Name,
  774. "error": events.Error(err),
  775. "type": "file",
  776. "action": "delete",
  777. })
  778. }()
  779. if err = f.checkToBeDeleted(file, cur, hasCur, scanChan); err != nil {
  780. if fs.IsNotExist(err) || fs.IsErrCaseConflict(err) {
  781. err = nil
  782. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  783. }
  784. return
  785. }
  786. // We are asked to delete a file, but what we have on disk and in db
  787. // is a directory. Something is wrong here, should probably not happen.
  788. if cur.IsDirectory() {
  789. err = errUnexpectedDirOnFileDel
  790. return
  791. }
  792. switch {
  793. case file.InConflictWith(cur) && !cur.IsSymlink():
  794. // If the delete constitutes winning a conflict, we move the file to
  795. // a conflict copy instead of doing the delete
  796. err = f.inWritableDir(func(name string) error {
  797. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  798. }, cur.Name)
  799. case f.versioner != nil && !cur.IsSymlink():
  800. // If we have a versioner, use that to move the file away
  801. err = f.inWritableDir(f.versioner.Archive, file.Name)
  802. default:
  803. // Delete the file
  804. err = f.inWritableDir(f.mtimefs.Remove, file.Name)
  805. }
  806. if err == nil || fs.IsNotExist(err) {
  807. // It was removed or it doesn't exist to start with
  808. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  809. return
  810. }
  811. if _, serr := f.mtimefs.Lstat(file.Name); serr != nil && !fs.IsPermission(serr) {
  812. // We get an error just looking at the file, and it's not a permission
  813. // problem. Lets assume the error is in fact some variant of "file
  814. // does not exist" (possibly expressed as some parent being a file and
  815. // not a directory etc) and that the delete is handled.
  816. err = nil
  817. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  818. }
  819. slog.Info("Deleted file", f.LogAttr(), file.LogAttr())
  820. }
  821. // renameFile attempts to rename an existing file to a destination
  822. // and set the right attributes on it.
  823. func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
  824. // Used in the defer closure below, updated by the function body. Take
  825. // care not declare another err.
  826. var err error
  827. f.evLogger.Log(events.ItemStarted, map[string]string{
  828. "folder": f.folderID,
  829. "item": source.Name,
  830. "type": "file",
  831. "action": "delete",
  832. })
  833. f.evLogger.Log(events.ItemStarted, map[string]string{
  834. "folder": f.folderID,
  835. "item": target.Name,
  836. "type": "file",
  837. "action": "update",
  838. })
  839. defer func() {
  840. if err != nil {
  841. slog.Info("Failed to rename file", f.LogAttr(), target.LogAttr(), slog.String("from", source.Name), slogutil.Error(err))
  842. } else {
  843. slog.Info("Renamed file", f.LogAttr(), target.LogAttr(), slog.String("from", source.Name))
  844. }
  845. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  846. "folder": f.folderID,
  847. "item": source.Name,
  848. "error": events.Error(err),
  849. "type": "file",
  850. "action": "delete",
  851. })
  852. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  853. "folder": f.folderID,
  854. "item": target.Name,
  855. "error": events.Error(err),
  856. "type": "file",
  857. "action": "update",
  858. })
  859. }()
  860. f.sl.Debug("Taking rename shortcut", "from", source.Name, "to", target.Name)
  861. // Check that source is compatible with what we have in the DB
  862. if err = f.checkToBeDeleted(source, cur, true, scanChan); err != nil {
  863. return err
  864. }
  865. // Check that the target corresponds to what we have in the DB
  866. curTarget, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, target.Name)
  867. if err != nil {
  868. return err
  869. }
  870. switch stat, serr := f.mtimefs.Lstat(target.Name); {
  871. case serr != nil:
  872. var caseErr *fs.CaseConflictError
  873. switch {
  874. case errors.As(serr, &caseErr):
  875. if caseErr.Real != source.Name {
  876. err = serr
  877. break
  878. }
  879. fallthrough // This is a case only rename
  880. case fs.IsNotExist(serr):
  881. if !ok || curTarget.IsDeleted() {
  882. break
  883. }
  884. scanChan <- target.Name
  885. err = errModified
  886. default:
  887. // We can't check whether the file changed as compared to the db,
  888. // do not delete.
  889. err = serr
  890. }
  891. case !ok:
  892. // Target appeared from nowhere
  893. scanChan <- target.Name
  894. err = errModified
  895. default:
  896. var fi protocol.FileInfo
  897. if fi, err = scanner.CreateFileInfo(stat, target.Name, f.mtimefs, f.SyncOwnership, f.SyncXattrs, f.XattrFilter); err == nil {
  898. if !fi.IsEquivalentOptional(curTarget, protocol.FileInfoComparison{
  899. ModTimeWindow: f.modTimeWindow,
  900. IgnorePerms: f.IgnorePerms,
  901. IgnoreBlocks: true,
  902. IgnoreFlags: protocol.LocalAllFlags,
  903. IgnoreOwnership: !f.SyncOwnership,
  904. IgnoreXattrs: !f.SyncXattrs,
  905. }) {
  906. // Target changed
  907. scanChan <- target.Name
  908. err = errModified
  909. }
  910. }
  911. }
  912. if err != nil {
  913. return err
  914. }
  915. tempName := fs.TempName(target.Name)
  916. if f.versioner != nil {
  917. err = f.CheckAvailableSpace(uint64(source.Size)) //nolint:gosec
  918. if err == nil {
  919. err = osutil.Copy(f.CopyRangeMethod.ToFS(), f.mtimefs, f.mtimefs, source.Name, tempName)
  920. if err == nil {
  921. err = f.inWritableDir(f.versioner.Archive, source.Name)
  922. }
  923. }
  924. } else {
  925. err = osutil.RenameOrCopy(f.CopyRangeMethod.ToFS(), f.mtimefs, f.mtimefs, source.Name, tempName)
  926. }
  927. if err != nil {
  928. return err
  929. }
  930. blockStatsMut.Lock()
  931. minBlocksPerBlock := target.BlockSize() / protocol.MinBlockSize
  932. blockStats["total"] += len(target.Blocks) * minBlocksPerBlock
  933. blockStats["renamed"] += len(target.Blocks) * minBlocksPerBlock
  934. blockStatsMut.Unlock()
  935. // The file was renamed, so we have handled both the necessary delete
  936. // of the source and the creation of the target temp file. Fix-up the metadata,
  937. // update the local index of the target file and rename from temp to real name.
  938. if err = f.performFinish(target, curTarget, true, tempName, dbUpdateChan, scanChan); err != nil {
  939. return err
  940. }
  941. dbUpdateChan <- dbUpdateJob{source, dbUpdateDeleteFile}
  942. return nil
  943. }
  944. // This is the flow of data and events here, I think...
  945. //
  946. // +-----------------------+
  947. // | | - - - - > ItemStarted
  948. // | handleFile | - - - - > ItemFinished (on shortcuts)
  949. // | |
  950. // +-----------------------+
  951. // |
  952. // | copyChan (copyBlocksState; unless shortcut taken)
  953. // |
  954. // | +-----------------------+
  955. // | | +-----------------------+
  956. // +--->| | |
  957. // | | copierRoutine |
  958. // +-| |
  959. // +-----------------------+
  960. // |
  961. // | pullChan (sharedPullerState)
  962. // |
  963. // | +-----------------------+
  964. // | | +-----------------------+
  965. // +-->| | |
  966. // | | pullerRoutine |
  967. // +-| |
  968. // +-----------------------+
  969. // |
  970. // | finisherChan (sharedPullerState)
  971. // |
  972. // | +-----------------------+
  973. // | | |
  974. // +-->| finisherRoutine | - - - - > ItemFinished
  975. // | |
  976. // +-----------------------+
  977. // handleFile queues the copies and pulls as necessary for a single new or
  978. // changed file.
  979. func (f *sendReceiveFolder) handleFile(ctx context.Context, file protocol.FileInfo, copyChan chan<- copyBlocksState) error {
  980. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  981. if err != nil {
  982. return err
  983. }
  984. have, _ := blockDiff(curFile.Blocks, file.Blocks)
  985. tempName := fs.TempName(file.Name)
  986. populateOffsets(file.Blocks)
  987. blocks := append([]protocol.BlockInfo{}, file.Blocks...)
  988. reused := make([]int, 0, len(file.Blocks))
  989. if f.Type != config.FolderTypeReceiveEncrypted {
  990. blocks, reused = f.reuseBlocks(ctx, blocks, reused, file, tempName)
  991. }
  992. // The sharedpullerstate will know which flags to use when opening the
  993. // temp file depending if we are reusing any blocks or not.
  994. if len(reused) == 0 {
  995. // Otherwise, discard the file ourselves in order for the
  996. // sharedpuller not to panic when it fails to exclusively create a
  997. // file which already exists
  998. f.inWritableDir(f.mtimefs.Remove, tempName)
  999. }
  1000. // Reorder blocks
  1001. blocks = f.blockPullReorderer.Reorder(blocks)
  1002. f.evLogger.Log(events.ItemStarted, map[string]string{
  1003. "folder": f.folderID,
  1004. "item": file.Name,
  1005. "type": "file",
  1006. "action": "update",
  1007. })
  1008. s := newSharedPullerState(file, f.mtimefs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles, !f.DisableFsync)
  1009. f.sl.DebugContext(ctx, "Handling file", slogutil.FilePath(file.Name), "blocksToCopy", len(blocks), "reused", len(reused))
  1010. cs := copyBlocksState{
  1011. sharedPullerState: s,
  1012. blocks: blocks,
  1013. have: len(have),
  1014. }
  1015. copyChan <- cs
  1016. return nil
  1017. }
  1018. func (f *sendReceiveFolder) reuseBlocks(ctx context.Context, blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
  1019. // Check for an old temporary file which might have some blocks we could
  1020. // reuse.
  1021. tempBlocks, err := scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
  1022. if err != nil {
  1023. var caseErr *fs.CaseConflictError
  1024. if errors.As(err, &caseErr) {
  1025. if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil {
  1026. tempBlocks, err = scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
  1027. }
  1028. }
  1029. }
  1030. if err != nil {
  1031. return blocks, reused
  1032. }
  1033. // Check for any reusable blocks in the temp file
  1034. tempCopyBlocks, _ := blockDiff(tempBlocks, file.Blocks)
  1035. // block.String() returns a string unique to the block
  1036. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  1037. for _, block := range tempCopyBlocks {
  1038. existingBlocks[block.String()] = struct{}{}
  1039. }
  1040. // Since the blocks are already there, we don't need to get them.
  1041. blocks = blocks[:0]
  1042. for i, block := range file.Blocks {
  1043. _, ok := existingBlocks[block.String()]
  1044. if !ok {
  1045. blocks = append(blocks, block)
  1046. } else {
  1047. reused = append(reused, i)
  1048. }
  1049. }
  1050. return blocks, reused
  1051. }
  1052. // blockDiff returns lists of common and missing (to transform src into tgt)
  1053. // blocks. Both block lists must have been created with the same block size.
  1054. func blockDiff(src, tgt []protocol.BlockInfo) ([]protocol.BlockInfo, []protocol.BlockInfo) {
  1055. if len(tgt) == 0 {
  1056. return nil, nil
  1057. }
  1058. if len(src) == 0 {
  1059. // Copy the entire file
  1060. return nil, tgt
  1061. }
  1062. have := make([]protocol.BlockInfo, 0, len(src))
  1063. need := make([]protocol.BlockInfo, 0, len(tgt))
  1064. for i := range tgt {
  1065. if i >= len(src) {
  1066. return have, append(need, tgt[i:]...)
  1067. }
  1068. if !bytes.Equal(tgt[i].Hash, src[i].Hash) {
  1069. // Copy differing block
  1070. need = append(need, tgt[i])
  1071. } else {
  1072. have = append(have, tgt[i])
  1073. }
  1074. }
  1075. return have, need
  1076. }
  1077. // populateOffsets sets the Offset field on each block
  1078. func populateOffsets(blocks []protocol.BlockInfo) {
  1079. var offset int64
  1080. for i := range blocks {
  1081. blocks[i].Offset = offset
  1082. offset += int64(blocks[i].Size)
  1083. }
  1084. }
  1085. // shortcutFile sets file metadata, when that's the only thing that has
  1086. // changed.
  1087. func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
  1088. f.sl.Debug("Taking metadata shortcut", slogutil.FilePath(file.Name))
  1089. f.evLogger.Log(events.ItemStarted, map[string]string{
  1090. "folder": f.folderID,
  1091. "item": file.Name,
  1092. "type": "file",
  1093. "action": "metadata",
  1094. })
  1095. var err error
  1096. defer func() {
  1097. if err != nil {
  1098. slog.Info("Failed to update file metadata", f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  1099. } else {
  1100. slog.Info("Updated file metadata", f.LogAttr(), file.LogAttr())
  1101. }
  1102. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  1103. "folder": f.folderID,
  1104. "item": file.Name,
  1105. "error": events.Error(err),
  1106. "type": "file",
  1107. "action": "metadata",
  1108. })
  1109. }()
  1110. f.queue.Done(file.Name)
  1111. if !f.IgnorePerms && !file.NoPermissions {
  1112. if err = f.mtimefs.Chmod(file.Name, fs.FileMode(file.Permissions&0o777)); err != nil {
  1113. f.newPullError(file.Name, fmt.Errorf("shortcut file (setting permissions): %w", err))
  1114. return
  1115. }
  1116. }
  1117. if err := f.setPlatformData(&file, file.Name); err != nil {
  1118. f.newPullError(file.Name, fmt.Errorf("shortcut file (setting metadata): %w", err))
  1119. return
  1120. }
  1121. // Still need to re-write the trailer with the new encrypted fileinfo.
  1122. if f.Type == config.FolderTypeReceiveEncrypted {
  1123. err = inWritableDir(func(path string) error {
  1124. fd, err := f.mtimefs.OpenFile(path, fs.OptReadWrite, 0o666)
  1125. if err != nil {
  1126. return err
  1127. }
  1128. defer fd.Close()
  1129. trailerSize, err := writeEncryptionTrailer(file, fd)
  1130. if err != nil {
  1131. return err
  1132. }
  1133. file.EncryptionTrailerSize = int(trailerSize)
  1134. file.Size += trailerSize
  1135. return fd.Truncate(file.Size)
  1136. }, f.mtimefs, file.Name, true)
  1137. if err != nil {
  1138. f.newPullError(file.Name, fmt.Errorf("writing encrypted file trailer: %w", err))
  1139. return
  1140. }
  1141. }
  1142. f.mtimefs.Chtimes(file.Name, file.ModTime(), file.ModTime()) // never fails
  1143. dbUpdateChan <- dbUpdateJob{file, dbUpdateShortcutFile}
  1144. }
  1145. // copierRoutine reads copierStates until the in channel closes and performs
  1146. // the relevant copies when possible, or passes it to the puller routine.
  1147. func (f *sendReceiveFolder) copierRoutine(ctx context.Context, in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  1148. otherFolderFilesystems := make(map[string]fs.Filesystem)
  1149. for folder, cfg := range f.model.cfg.Folders() {
  1150. if folder == f.ID {
  1151. continue
  1152. }
  1153. otherFolderFilesystems[folder] = cfg.Filesystem()
  1154. }
  1155. for state := range in {
  1156. if f.Type != config.FolderTypeReceiveEncrypted {
  1157. f.model.progressEmitter.Register(state.sharedPullerState)
  1158. }
  1159. f.setState(FolderSyncing) // Does nothing if already FolderSyncing
  1160. blocks:
  1161. for _, block := range state.blocks {
  1162. select {
  1163. case <-ctx.Done():
  1164. state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
  1165. break blocks
  1166. default:
  1167. }
  1168. if !f.DisableSparseFiles && state.reused == 0 && block.IsEmpty() {
  1169. // The block is a block of all zeroes, and we are not reusing
  1170. // a temp file, so there is no need to do anything with it.
  1171. // If we were reusing a temp file and had this block to copy,
  1172. // it would be because the block in the temp file was *not* a
  1173. // block of all zeroes, so then we should not skip it.
  1174. // Pretend we copied it.
  1175. state.skippedSparseBlock(block.Size)
  1176. state.copyDone(block)
  1177. continue
  1178. }
  1179. if f.copyBlock(ctx, block, state, otherFolderFilesystems) {
  1180. state.copyDone(block)
  1181. continue
  1182. }
  1183. if state.failed() != nil {
  1184. break
  1185. }
  1186. state.pullStarted()
  1187. ps := pullBlockState{
  1188. sharedPullerState: state.sharedPullerState,
  1189. block: block,
  1190. }
  1191. pullChan <- ps
  1192. }
  1193. // If there are no blocks to pull/copy, we still need the temporary file in place.
  1194. if len(state.blocks) == 0 {
  1195. _, err := state.tempFile()
  1196. if err != nil {
  1197. state.fail(err)
  1198. }
  1199. }
  1200. out <- state.sharedPullerState
  1201. }
  1202. }
  1203. // Returns true when the block was successfully copied.
  1204. func (f *sendReceiveFolder) copyBlock(ctx context.Context, block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool {
  1205. buf := protocol.BufferPool.Get(block.Size)
  1206. defer protocol.BufferPool.Put(buf)
  1207. // Hope that it's usually in the same folder, so start with that
  1208. // one. Also possibly more efficient copy (same filesystem).
  1209. if f.copyBlockFromFolder(ctx, f.ID, block, state, f.mtimefs, buf) {
  1210. return true
  1211. }
  1212. if state.failed() != nil {
  1213. return false
  1214. }
  1215. for folderID, ffs := range otherFolderFilesystems {
  1216. if f.copyBlockFromFolder(ctx, folderID, block, state, ffs, buf) {
  1217. return true
  1218. }
  1219. if state.failed() != nil {
  1220. return false
  1221. }
  1222. }
  1223. return false
  1224. }
  1225. // Returns true when the block was successfully copied.
  1226. // The passed buffer must be large enough to accommodate the block.
  1227. func (f *sendReceiveFolder) copyBlockFromFolder(ctx context.Context, folderID string, block protocol.BlockInfo, state copyBlocksState, ffs fs.Filesystem, buf []byte) bool {
  1228. for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
  1229. if err != nil {
  1230. // We just ignore this and continue pulling instead (though
  1231. // there's a good chance that will fail too, if the DB is
  1232. // unhealthy).
  1233. f.sl.DebugContext(ctx, "Failed to get block information from database", "blockHash", block.Hash, slogutil.FilePath(state.file.Name), slogutil.Error(err))
  1234. return false
  1235. }
  1236. if !f.copyBlockFromFile(ctx, e.FileName, e.Offset, state, ffs, block, buf) {
  1237. if state.failed() != nil {
  1238. return false
  1239. }
  1240. continue
  1241. }
  1242. if e.FileName == state.file.Name {
  1243. state.copiedFromOrigin(block.Size)
  1244. } else {
  1245. state.copiedFromElsewhere(block.Size)
  1246. }
  1247. return true
  1248. }
  1249. return false
  1250. }
  1251. // Returns true when the block was successfully copied.
  1252. // The passed buffer must be large enough to accommodate the block.
  1253. func (f *sendReceiveFolder) copyBlockFromFile(ctx context.Context, srcName string, srcOffset int64, state copyBlocksState, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) bool {
  1254. fd, err := ffs.Open(srcName)
  1255. if err != nil {
  1256. f.sl.DebugContext(ctx, "Failed to open source file for block copy", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
  1257. return false
  1258. }
  1259. defer fd.Close()
  1260. _, err = fd.ReadAt(buf, srcOffset)
  1261. if err != nil {
  1262. f.sl.DebugContext(ctx, "Failed to read block from file", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
  1263. return false
  1264. }
  1265. // Hash is not SHA256 as it's an encrypted hash token. In that
  1266. // case we can't verify the block integrity so we'll take it on
  1267. // trust. (The other side can and will verify.)
  1268. if f.Type != config.FolderTypeReceiveEncrypted {
  1269. if err := f.verifyBuffer(buf, block); err != nil {
  1270. f.sl.DebugContext(ctx, "Failed to verify block buffer", slogutil.Error(err))
  1271. return false
  1272. }
  1273. }
  1274. dstFd, err := state.tempFile()
  1275. if err != nil {
  1276. // State is already marked as failed when an error is returned here.
  1277. return false
  1278. }
  1279. if f.CopyRangeMethod != config.CopyRangeMethodStandard {
  1280. err = f.withLimiter(ctx, func() error {
  1281. dstFd.mut.Lock()
  1282. defer dstFd.mut.Unlock()
  1283. return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
  1284. })
  1285. } else {
  1286. err = f.limitedWriteAt(ctx, dstFd, buf, block.Offset)
  1287. }
  1288. if err != nil {
  1289. state.fail(fmt.Errorf("dst write: %w", err))
  1290. return false
  1291. }
  1292. return true
  1293. }
  1294. func (*sendReceiveFolder) verifyBuffer(buf []byte, block protocol.BlockInfo) error {
  1295. if len(buf) != block.Size {
  1296. return fmt.Errorf("length mismatch %d != %d", len(buf), block.Size)
  1297. }
  1298. hash := sha256.Sum256(buf)
  1299. if !bytes.Equal(hash[:], block.Hash) {
  1300. return fmt.Errorf("hash mismatch %x != %x", hash, block.Hash)
  1301. }
  1302. return nil
  1303. }
  1304. func (f *sendReceiveFolder) pullerRoutine(ctx context.Context, in <-chan pullBlockState, out chan<- *sharedPullerState) {
  1305. requestLimiter := semaphore.New(f.PullerMaxPendingKiB * 1024)
  1306. var wg sync.WaitGroup
  1307. for state := range in {
  1308. if state.failed() != nil {
  1309. out <- state.sharedPullerState
  1310. continue
  1311. }
  1312. f.setState(FolderSyncing) // Does nothing if already FolderSyncing
  1313. // The requestLimiter limits how many pending block requests we have
  1314. // ongoing at any given time, based on the size of the blocks
  1315. // themselves.
  1316. bytes := state.block.Size
  1317. if err := requestLimiter.TakeWithContext(ctx, bytes); err != nil {
  1318. state.fail(err)
  1319. out <- state.sharedPullerState
  1320. continue
  1321. }
  1322. wg.Go(func() {
  1323. defer requestLimiter.Give(bytes)
  1324. f.pullBlock(ctx, state, out)
  1325. })
  1326. }
  1327. wg.Wait()
  1328. }
  1329. func (f *sendReceiveFolder) pullBlock(ctx context.Context, state pullBlockState, out chan<- *sharedPullerState) {
  1330. // Get an fd to the temporary file. Technically we don't need it until
  1331. // after fetching the block, but if we run into an error here there is
  1332. // no point in issuing the request to the network.
  1333. fd, err := state.tempFile()
  1334. if err != nil {
  1335. out <- state.sharedPullerState
  1336. return
  1337. }
  1338. if !f.DisableSparseFiles && state.reused == 0 && state.block.IsEmpty() {
  1339. // There is no need to request a block of all zeroes. Pretend we
  1340. // requested it and handled it correctly.
  1341. state.pullDone(state.block)
  1342. out <- state.sharedPullerState
  1343. return
  1344. }
  1345. var lastError error
  1346. candidates := f.model.blockAvailability(f.FolderConfiguration, state.file, state.block)
  1347. loop:
  1348. for {
  1349. select {
  1350. case <-ctx.Done():
  1351. state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
  1352. break loop
  1353. default:
  1354. }
  1355. // Select the least busy device to pull the block from. If we found no
  1356. // feasible device at all, fail the block (and in the long run, the
  1357. // file).
  1358. found := activity.leastBusy(candidates)
  1359. if found == -1 {
  1360. if lastError != nil {
  1361. state.fail(fmt.Errorf("pull: %w", lastError))
  1362. } else {
  1363. state.fail(fmt.Errorf("pull: %w", errNoDevice))
  1364. }
  1365. break
  1366. }
  1367. selected := candidates[found]
  1368. candidates[found] = candidates[len(candidates)-1]
  1369. candidates = candidates[:len(candidates)-1]
  1370. // Fetch the block, while marking the selected device as in use so that
  1371. // leastBusy can select another device when someone else asks.
  1372. activity.using(selected)
  1373. var buf []byte
  1374. blockNo := int(state.block.Offset / int64(state.file.BlockSize()))
  1375. buf, lastError = f.model.RequestGlobal(ctx, selected.ID, f.folderID, state.file.Name, blockNo, state.block.Offset, state.block.Size, state.block.Hash, selected.FromTemporary)
  1376. activity.done(selected)
  1377. if lastError != nil {
  1378. f.sl.DebugContext(ctx, "Block request returned error", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size, "device", selected.ID.Short(), slogutil.Error(lastError))
  1379. continue
  1380. }
  1381. // Verify that the received block matches the desired hash, if not
  1382. // try pulling it from another device.
  1383. // For receive-only folders, the hash is not SHA256 as it's an
  1384. // encrypted hash token. In that case we can't verify the block
  1385. // integrity so we'll take it on trust. (The other side can and
  1386. // will verify.)
  1387. if f.Type != config.FolderTypeReceiveEncrypted {
  1388. lastError = f.verifyBuffer(buf, state.block)
  1389. }
  1390. if lastError != nil {
  1391. f.sl.DebugContext(ctx, "Block hash mismatch", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size)
  1392. continue
  1393. }
  1394. // Save the block data we got from the cluster
  1395. err = f.limitedWriteAt(ctx, fd, buf, state.block.Offset)
  1396. if err != nil {
  1397. state.fail(fmt.Errorf("save: %w", err))
  1398. } else {
  1399. state.pullDone(state.block)
  1400. }
  1401. break
  1402. }
  1403. out <- state.sharedPullerState
  1404. }
  1405. func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
  1406. // Set the correct permission bits on the new file
  1407. if !f.IgnorePerms && !file.NoPermissions {
  1408. if err := f.mtimefs.Chmod(tempName, fs.FileMode(file.Permissions&0o777)); err != nil {
  1409. return fmt.Errorf("setting permissions: %w", err)
  1410. }
  1411. }
  1412. // Set file xattrs and ownership.
  1413. if err := f.setPlatformData(&file, tempName); err != nil {
  1414. return fmt.Errorf("setting metadata: %w", err)
  1415. }
  1416. if stat, err := f.mtimefs.Lstat(file.Name); err == nil {
  1417. // There is an old file or directory already in place. We need to
  1418. // handle that.
  1419. if err := f.scanIfItemChanged(file.Name, stat, curFile, hasCurFile, false, scanChan); err != nil {
  1420. return fmt.Errorf("checking existing file: %w", err)
  1421. }
  1422. if !curFile.IsDirectory() && !curFile.IsSymlink() && file.InConflictWith(curFile) {
  1423. // The new file has been changed in conflict with the existing one. We
  1424. // should file it away as a conflict instead of just removing or
  1425. // archiving.
  1426. // Directories and symlinks aren't checked for conflicts.
  1427. err = f.inWritableDir(func(name string) error {
  1428. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  1429. }, curFile.Name)
  1430. } else {
  1431. err = f.deleteItemOnDisk(curFile, scanChan)
  1432. }
  1433. if err != nil {
  1434. return fmt.Errorf("moving for conflict: %w", err)
  1435. }
  1436. } else if !fs.IsNotExist(err) {
  1437. return fmt.Errorf("checking existing file: %w", err)
  1438. }
  1439. // Replace the original content with the new one. If it didn't work,
  1440. // leave the temp file in place for reuse.
  1441. if err := osutil.RenameOrCopy(f.CopyRangeMethod.ToFS(), f.mtimefs, f.mtimefs, tempName, file.Name); err != nil {
  1442. return fmt.Errorf("replacing file: %w", err)
  1443. }
  1444. // Set the correct timestamp on the new file
  1445. f.mtimefs.Chtimes(file.Name, file.ModTime(), file.ModTime()) // never fails
  1446. // Record the updated file in the index
  1447. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleFile}
  1448. return nil
  1449. }
  1450. func (f *sendReceiveFolder) finisherRoutine(ctx context.Context, in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  1451. for state := range in {
  1452. if closed, err := state.finalClose(); closed {
  1453. f.sl.DebugContext(ctx, "Closing temp file", slogutil.FilePath(state.file.Name))
  1454. f.queue.Done(state.file.Name)
  1455. if err == nil {
  1456. err = f.performFinish(state.file, state.curFile, state.hasCurFile, state.tempName, dbUpdateChan, scanChan)
  1457. }
  1458. if err != nil {
  1459. f.newPullError(state.file.Name, fmt.Errorf("finishing: %w", err))
  1460. } else {
  1461. slog.InfoContext(ctx, "Synced file", f.LogAttr(), state.file.LogAttr(), slog.Group("blocks", slog.Int("local", state.reused+state.copyTotal), slog.Int("download", state.pullTotal)))
  1462. minBlocksPerBlock := state.file.BlockSize() / protocol.MinBlockSize
  1463. blockStatsMut.Lock()
  1464. blockStats["total"] += (state.reused + state.copyTotal + state.pullTotal) * minBlocksPerBlock
  1465. blockStats["reused"] += state.reused * minBlocksPerBlock
  1466. blockStats["pulled"] += state.pullTotal * minBlocksPerBlock
  1467. // copyOriginShifted is counted towards copyOrigin due to progress bar reasons
  1468. // for reporting reasons we want to separate these.
  1469. blockStats["copyOrigin"] += state.copyOrigin * minBlocksPerBlock
  1470. blockStats["copyElsewhere"] += (state.copyTotal - state.copyOrigin) * minBlocksPerBlock
  1471. blockStatsMut.Unlock()
  1472. }
  1473. if f.Type != config.FolderTypeReceiveEncrypted {
  1474. f.model.progressEmitter.Deregister(state)
  1475. }
  1476. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  1477. "folder": f.folderID,
  1478. "item": state.file.Name,
  1479. "error": events.Error(err),
  1480. "type": "file",
  1481. "action": "update",
  1482. })
  1483. }
  1484. }
  1485. }
  1486. // Moves the given filename to the front of the job queue
  1487. func (f *sendReceiveFolder) BringToFront(filename string) {
  1488. f.queue.BringToFront(filename)
  1489. }
  1490. func (f *sendReceiveFolder) Jobs(page, perpage int) ([]string, []string, int) {
  1491. return f.queue.Jobs(page, perpage)
  1492. }
  1493. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1494. // larger than 1000 items, and no more delayed than 2 seconds.
  1495. func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) int {
  1496. const maxBatchTime = 2 * time.Second
  1497. changed := 0
  1498. changedDirs := make(map[string]struct{})
  1499. found := false
  1500. var lastFile protocol.FileInfo
  1501. tick := time.NewTicker(maxBatchTime)
  1502. defer tick.Stop()
  1503. batch := NewFileInfoBatch(func(files []protocol.FileInfo) error {
  1504. // sync directories
  1505. for dir := range changedDirs {
  1506. delete(changedDirs, dir)
  1507. if !f.DisableFsync {
  1508. fd, err := f.mtimefs.Open(dir)
  1509. if err != nil {
  1510. f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
  1511. continue
  1512. }
  1513. if err := fd.Sync(); err != nil {
  1514. f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
  1515. }
  1516. fd.Close()
  1517. }
  1518. }
  1519. // All updates to file/folder objects that originated remotely
  1520. // (across the network) use this call to updateLocals
  1521. f.updateLocalsFromPulling(files)
  1522. if found {
  1523. f.ReceivedFile(lastFile.Name, lastFile.IsDeleted())
  1524. found = false
  1525. }
  1526. return nil
  1527. })
  1528. loop:
  1529. for {
  1530. select {
  1531. case job, ok := <-dbUpdateChan:
  1532. if !ok {
  1533. break loop
  1534. }
  1535. switch job.jobType {
  1536. case dbUpdateHandleFile, dbUpdateShortcutFile:
  1537. changedDirs[filepath.Dir(job.file.Name)] = struct{}{}
  1538. case dbUpdateHandleDir:
  1539. changedDirs[job.file.Name] = struct{}{}
  1540. case dbUpdateHandleSymlink, dbUpdateInvalidate:
  1541. // fsyncing symlinks is only supported by MacOS
  1542. // and invalidated files are db only changes -> no sync
  1543. }
  1544. // For some reason we seem to care about file deletions and
  1545. // content modification, but not about metadata and dirs/symlinks.
  1546. if !job.file.IsInvalid() && job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) != 0 {
  1547. found = true
  1548. lastFile = job.file
  1549. }
  1550. if !job.file.IsDeleted() && !job.file.IsInvalid() {
  1551. // Now that the file is finalized, grab possibly updated
  1552. // inode change time from disk into the local FileInfo. We
  1553. // use this change time to check for changes to xattrs etc
  1554. // on next scan.
  1555. if err := f.updateFileInfoChangeTime(&job.file); err != nil {
  1556. // This means on next scan the likely incorrect change time
  1557. // (resp. whatever caused the error) will cause this file to
  1558. // change. Log at info level to leave a trace if a user
  1559. // notices, but no need to warn
  1560. f.sl.Warn("Failed to update metadata at database commit", slogutil.FilePath(job.file.Name), slogutil.Error(err))
  1561. }
  1562. }
  1563. job.file.Sequence = 0
  1564. batch.Append(job.file)
  1565. batch.FlushIfFull()
  1566. changed++
  1567. case <-tick.C:
  1568. batch.Flush()
  1569. }
  1570. }
  1571. batch.Flush()
  1572. return changed
  1573. }
  1574. // pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
  1575. // scheduled once when scanChan is closed (scanning can not happen during pulling).
  1576. func (f *sendReceiveFolder) pullScannerRoutine(ctx context.Context, scanChan <-chan string) {
  1577. toBeScanned := make(map[string]struct{})
  1578. for path := range scanChan {
  1579. toBeScanned[path] = struct{}{}
  1580. }
  1581. if len(toBeScanned) != 0 {
  1582. scanList := make([]string, 0, len(toBeScanned))
  1583. for path := range toBeScanned {
  1584. slog.DebugContext(ctx, "Scheduling scan after pulling", slogutil.FilePath(path))
  1585. scanList = append(scanList, path)
  1586. }
  1587. f.Scan(scanList)
  1588. }
  1589. }
  1590. func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan chan<- string) error {
  1591. if isConflict(name) {
  1592. f.sl.Info("Conflict on existing conflict copy; not copying again", slogutil.FilePath(name))
  1593. if err := f.mtimefs.Remove(name); err != nil && !fs.IsNotExist(err) {
  1594. return fmt.Errorf("%s: %w", contextRemovingOldItem, err)
  1595. }
  1596. return nil
  1597. }
  1598. if f.MaxConflicts == 0 {
  1599. if err := f.mtimefs.Remove(name); err != nil && !fs.IsNotExist(err) {
  1600. return fmt.Errorf("%s: %w", contextRemovingOldItem, err)
  1601. }
  1602. return nil
  1603. }
  1604. metricFolderConflictsTotal.WithLabelValues(f.ID).Inc()
  1605. newName := conflictName(name, lastModBy)
  1606. err := f.mtimefs.Rename(name, newName)
  1607. if fs.IsNotExist(err) {
  1608. // We were supposed to move a file away but it does not exist. Either
  1609. // the user has already moved it away, or the conflict was between a
  1610. // remote modification and a local delete. In either way it does not
  1611. // matter, go ahead as if the move succeeded.
  1612. err = nil
  1613. }
  1614. if f.MaxConflicts > -1 {
  1615. matches := existingConflicts(name, f.mtimefs)
  1616. if len(matches) > f.MaxConflicts {
  1617. slices.SortFunc(matches, func(a, b string) int {
  1618. return strings.Compare(b, a)
  1619. })
  1620. for _, match := range matches[f.MaxConflicts:] {
  1621. if gerr := f.mtimefs.Remove(match); gerr != nil {
  1622. f.sl.Debug("Failed to remove extra conflict copy", slogutil.Error(gerr))
  1623. }
  1624. }
  1625. }
  1626. }
  1627. if err == nil {
  1628. scanChan <- newName
  1629. }
  1630. return err
  1631. }
  1632. func (f *sendReceiveFolder) newPullError(path string, err error) {
  1633. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  1634. // Error because the folder stopped - no point logging/tracking
  1635. return
  1636. }
  1637. f.errorsMut.Lock()
  1638. defer f.errorsMut.Unlock()
  1639. // We might get more than one error report for a file (i.e. error on
  1640. // Write() followed by Close()); we keep the first error as that is
  1641. // probably closer to the root cause.
  1642. if _, ok := f.tempPullErrors[path]; ok {
  1643. return
  1644. }
  1645. // Establish context to differentiate from errors while scanning.
  1646. // Use "syncing" as opposed to "pulling" as the latter might be used
  1647. // for errors occurring specifically in the puller routine.
  1648. errStr := fmt.Sprintf("syncing: %s", err)
  1649. f.tempPullErrors[path] = errStr
  1650. f.sl.Debug("New pull error", slogutil.FilePath(path), slogutil.Error(err))
  1651. }
  1652. // deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
  1653. func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, scanChan chan<- string) (err error) {
  1654. defer func() {
  1655. if err != nil {
  1656. err = fmt.Errorf("%s: %w", contextRemovingOldItem, err)
  1657. }
  1658. }()
  1659. switch {
  1660. case item.IsDirectory():
  1661. // Directories aren't archived and need special treatment due
  1662. // to potential children.
  1663. return f.deleteDirOnDisk(item.Name, scanChan)
  1664. case !item.IsSymlink() && f.versioner != nil:
  1665. // If we should use versioning, let the versioner archive the
  1666. // file before we replace it. Archiving a non-existent file is not
  1667. // an error.
  1668. // Symlinks aren't archived.
  1669. return f.inWritableDir(f.versioner.Archive, item.Name)
  1670. }
  1671. return f.inWritableDir(f.mtimefs.Remove, item.Name)
  1672. }
  1673. // deleteDirOnDisk attempts to delete a directory. It checks for files/dirs inside
  1674. // the directory and removes them if possible or returns an error if it fails
  1675. func (f *sendReceiveFolder) deleteDirOnDisk(dir string, scanChan chan<- string) error {
  1676. if err := osutil.TraversesSymlink(f.mtimefs, filepath.Dir(dir)); err != nil {
  1677. return err
  1678. }
  1679. if err := f.deleteDirOnDiskHandleChildren(dir, scanChan); err != nil {
  1680. return err
  1681. }
  1682. err := f.inWritableDir(f.mtimefs.Remove, dir)
  1683. if err == nil || fs.IsNotExist(err) {
  1684. // It was removed or it doesn't exist to start with
  1685. return nil
  1686. }
  1687. if _, serr := f.mtimefs.Lstat(dir); serr != nil && !fs.IsPermission(serr) {
  1688. // We get an error just looking at the directory, and it's not a
  1689. // permission problem. Lets assume the error is in fact some variant
  1690. // of "file does not exist" (possibly expressed as some parent being a
  1691. // file and not a directory etc) and that the delete is handled.
  1692. return nil
  1693. }
  1694. return err
  1695. }
  1696. func (f *sendReceiveFolder) deleteDirOnDiskHandleChildren(dir string, scanChan chan<- string) error {
  1697. var dirsToDelete []string
  1698. var hasIgnored, hasKnown, hasToBeScanned, hasReceiveOnlyChanged bool
  1699. var delErr error
  1700. err := f.mtimefs.Walk(dir, func(path string, info fs.FileInfo, err error) error {
  1701. if path == dir {
  1702. return nil
  1703. }
  1704. if err != nil {
  1705. return err
  1706. }
  1707. switch match := f.ignores.Match(path); {
  1708. case match.IsDeletable():
  1709. if info.IsDir() {
  1710. dirsToDelete = append(dirsToDelete, path)
  1711. return nil
  1712. }
  1713. fallthrough
  1714. case fs.IsTemporary(path):
  1715. if err := f.mtimefs.Remove(path); err != nil && delErr == nil {
  1716. delErr = err
  1717. }
  1718. return nil
  1719. case match.IsIgnored():
  1720. hasIgnored = true
  1721. return nil
  1722. }
  1723. cf, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, path)
  1724. if err != nil {
  1725. return err
  1726. }
  1727. switch {
  1728. case !ok || cf.IsDeleted():
  1729. // Something appeared in the dir that we either are not
  1730. // aware of at all or that we think should be deleted
  1731. // -> schedule scan.
  1732. scanChan <- path
  1733. hasToBeScanned = true
  1734. return nil
  1735. case ok && f.Type == config.FolderTypeReceiveOnly && cf.IsReceiveOnlyChanged():
  1736. hasReceiveOnlyChanged = true
  1737. return nil
  1738. }
  1739. diskFile, err := scanner.CreateFileInfo(info, path, f.mtimefs, f.SyncOwnership, f.SyncXattrs, f.XattrFilter)
  1740. if err != nil {
  1741. // Lets just assume the file has changed.
  1742. scanChan <- path
  1743. hasToBeScanned = true
  1744. return nil //nolint:nilerr
  1745. }
  1746. if !cf.IsEquivalentOptional(diskFile, protocol.FileInfoComparison{
  1747. ModTimeWindow: f.modTimeWindow,
  1748. IgnorePerms: f.IgnorePerms,
  1749. IgnoreBlocks: true,
  1750. IgnoreFlags: protocol.LocalAllFlags,
  1751. IgnoreOwnership: !f.SyncOwnership,
  1752. IgnoreXattrs: !f.SyncXattrs,
  1753. }) {
  1754. // File on disk changed compared to what we have in db
  1755. // -> schedule scan.
  1756. scanChan <- path
  1757. hasToBeScanned = true
  1758. return nil
  1759. }
  1760. // Dir contains file that is valid according to db and
  1761. // not ignored -> something weird is going on
  1762. hasKnown = true
  1763. return nil
  1764. })
  1765. if err != nil {
  1766. return err
  1767. }
  1768. for i := range dirsToDelete {
  1769. if err := f.mtimefs.Remove(dirsToDelete[len(dirsToDelete)-1-i]); err != nil && delErr == nil {
  1770. delErr = err
  1771. }
  1772. }
  1773. // "Error precedence":
  1774. // Something changed on disk, check that and maybe all else gets resolved
  1775. if hasToBeScanned {
  1776. return errDirHasToBeScanned
  1777. }
  1778. // Ignored files will never be touched, i.e. this will keep failing until
  1779. // user acts.
  1780. if hasIgnored {
  1781. return errDirHasIgnored
  1782. }
  1783. if hasReceiveOnlyChanged {
  1784. // Pretend we deleted the directory. It will be resurrected as a
  1785. // receive-only changed item on scan.
  1786. scanChan <- dir
  1787. return nil
  1788. }
  1789. if hasKnown {
  1790. return errDirNotEmpty
  1791. }
  1792. // All good, except maybe failing to remove a (?d) ignored item
  1793. return delErr
  1794. }
  1795. // scanIfItemChanged schedules the given file for scanning and returns errModified
  1796. // if it differs from the information in the database. Returns nil if the file has
  1797. // not changed.
  1798. func (f *sendReceiveFolder) scanIfItemChanged(name string, stat fs.FileInfo, item protocol.FileInfo, hasItem bool, fromDelete bool, scanChan chan<- string) (err error) {
  1799. defer func() {
  1800. if errors.Is(err, errModified) {
  1801. scanChan <- name
  1802. }
  1803. }()
  1804. if !hasItem || item.Deleted {
  1805. // The item appeared from nowhere
  1806. return errModified
  1807. }
  1808. // Check that the item on disk is what we expect it to be according
  1809. // to the database. If there's a mismatch here, there might be local
  1810. // changes that we don't know about yet and we should scan before
  1811. // touching the item.
  1812. statItem, err := scanner.CreateFileInfo(stat, item.Name, f.mtimefs, f.SyncOwnership, f.SyncXattrs, f.XattrFilter)
  1813. if err != nil {
  1814. return fmt.Errorf("comparing item on disk to db: %w", err)
  1815. }
  1816. if !statItem.IsEquivalentOptional(item, protocol.FileInfoComparison{
  1817. ModTimeWindow: f.modTimeWindow,
  1818. IgnorePerms: f.IgnorePerms,
  1819. IgnoreBlocks: true,
  1820. IgnoreFlags: protocol.LocalAllFlags,
  1821. IgnoreOwnership: fromDelete || !f.SyncOwnership,
  1822. IgnoreXattrs: fromDelete || !f.SyncXattrs,
  1823. }) {
  1824. return errModified
  1825. }
  1826. return nil
  1827. }
  1828. // checkToBeDeleted makes sure the file on disk is compatible with what there is
  1829. // in the DB before the caller proceeds with actually deleting it.
  1830. // I.e. non-nil error status means "Do not delete!" or "is already deleted".
  1831. func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur bool, scanChan chan<- string) error {
  1832. if err := osutil.TraversesSymlink(f.mtimefs, filepath.Dir(file.Name)); err != nil {
  1833. f.sl.Debug("Not deleting item behind symlink on disk, but updating database", slogutil.FilePath(file.Name))
  1834. return fs.ErrNotExist
  1835. }
  1836. stat, err := f.mtimefs.Lstat(file.Name)
  1837. deleted := fs.IsNotExist(err) || fs.IsErrCaseConflict(err)
  1838. if !deleted && err != nil {
  1839. return err
  1840. }
  1841. if deleted {
  1842. if hasCur && !cur.Deleted && !cur.IsUnsupported() {
  1843. scanChan <- file.Name
  1844. return errModified
  1845. }
  1846. f.sl.Debug("Not deleting item we don't have, but updating database", slogutil.FilePath(file.Name))
  1847. return err
  1848. }
  1849. return f.scanIfItemChanged(file.Name, stat, cur, hasCur, true, scanChan)
  1850. }
  1851. // setPlatformData makes adjustments to the metadata that should happen for
  1852. // all types (files, directories, symlinks). This should be one of the last
  1853. // things we do to a file when syncing changes to it.
  1854. func (f *sendReceiveFolder) setPlatformData(file *protocol.FileInfo, name string) error {
  1855. if f.SyncXattrs {
  1856. // Set extended attributes.
  1857. if err := f.mtimefs.SetXattr(name, file.Platform.Xattrs(), f.XattrFilter); errors.Is(err, fs.ErrXattrsNotSupported) {
  1858. f.sl.Debug("Cannot set xattrs (not supported)", slogutil.FilePath(file.Name), slogutil.Error(err))
  1859. } else if err != nil {
  1860. return err
  1861. }
  1862. }
  1863. if f.SyncOwnership {
  1864. // Set ownership based on file metadata.
  1865. if err := f.syncOwnership(file, name); err != nil {
  1866. return err
  1867. }
  1868. } else if f.CopyOwnershipFromParent {
  1869. // Copy the parent owner and group.
  1870. if err := f.copyOwnershipFromParent(name); err != nil {
  1871. return err
  1872. }
  1873. }
  1874. return nil
  1875. }
  1876. func (f *sendReceiveFolder) copyOwnershipFromParent(path string) error {
  1877. if build.IsWindows {
  1878. // Can't do anything.
  1879. return nil
  1880. }
  1881. info, err := f.mtimefs.Lstat(filepath.Dir(path))
  1882. if err != nil {
  1883. return fmt.Errorf("copy owner from parent: %w", err)
  1884. }
  1885. if err := f.mtimefs.Lchown(path, strconv.Itoa(info.Owner()), strconv.Itoa(info.Group())); err != nil {
  1886. return fmt.Errorf("copy owner from parent: %w", err)
  1887. }
  1888. return nil
  1889. }
  1890. func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) error {
  1891. return inWritableDir(fn, f.mtimefs, path, f.IgnorePerms)
  1892. }
  1893. func (f *sendReceiveFolder) limitedWriteAt(ctx context.Context, fd io.WriterAt, data []byte, offset int64) error {
  1894. return f.withLimiter(ctx, func() error {
  1895. _, err := fd.WriteAt(data, offset)
  1896. return err
  1897. })
  1898. }
  1899. func (f *sendReceiveFolder) withLimiter(ctx context.Context, fn func() error) error {
  1900. if err := f.writeLimiter.TakeWithContext(ctx, 1); err != nil {
  1901. return err
  1902. }
  1903. defer f.writeLimiter.Give(1)
  1904. return fn()
  1905. }
  1906. // updateFileInfoChangeTime updates the inode change time in the FileInfo,
  1907. // because that depends on the current, new, state of the file on disk.
  1908. func (f *sendReceiveFolder) updateFileInfoChangeTime(file *protocol.FileInfo) error {
  1909. info, err := f.mtimefs.Lstat(file.Name)
  1910. if err != nil {
  1911. return err
  1912. }
  1913. if ct := info.InodeChangeTime(); !ct.IsZero() {
  1914. file.InodeChangeNs = ct.UnixNano()
  1915. } else {
  1916. file.InodeChangeNs = 0
  1917. }
  1918. return nil
  1919. }
  1920. // A []FileError is sent as part of an event and will be JSON serialized.
  1921. type FileError struct {
  1922. Path string `json:"path"`
  1923. Err string `json:"error"`
  1924. }
  1925. func conflictName(name, lastModBy string) string {
  1926. ext := filepath.Ext(name)
  1927. return name[:len(name)-len(ext)] + time.Now().Format(".sync-conflict-20060102-150405-") + lastModBy + ext
  1928. }
  1929. func isConflict(name string) bool {
  1930. return strings.Contains(filepath.Base(name), ".sync-conflict-")
  1931. }
  1932. func existingConflicts(name string, fs fs.Filesystem) []string {
  1933. ext := filepath.Ext(name)
  1934. matches, err := fs.Glob(name[:len(name)-len(ext)] + ".sync-conflict-????????-??????*" + ext)
  1935. if err != nil {
  1936. slog.Debug("Globbing for conflicts failed", slogutil.Error(err))
  1937. }
  1938. return matches
  1939. }