folder_sendrecv.go 70 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096209720982099210021012102210321042105210621072108210921102111211221132114211521162117211821192120212121222123212421252126212721282129213021312132213321342135213621372138213921402141214221432144214521462147214821492150215121522153215421552156215721582159216021612162216321642165216621672168216921702171217221732174217521762177217821792180218121822183218421852186218721882189219021912192219321942195219621972198219922002201220222032204220522062207220822092210221122122213221422152216221722182219222022212222222322242225222622272228222922302231223222332234223522362237223822392240
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package model
  7. import (
  8. "bytes"
  9. "context"
  10. "crypto/sha256"
  11. "errors"
  12. "fmt"
  13. "io"
  14. "log/slog"
  15. "path/filepath"
  16. "slices"
  17. "strconv"
  18. "strings"
  19. "sync"
  20. "time"
  21. "github.com/syncthing/syncthing/internal/itererr"
  22. "github.com/syncthing/syncthing/internal/slogutil"
  23. "github.com/syncthing/syncthing/lib/build"
  24. "github.com/syncthing/syncthing/lib/config"
  25. "github.com/syncthing/syncthing/lib/events"
  26. "github.com/syncthing/syncthing/lib/fs"
  27. "github.com/syncthing/syncthing/lib/ignore"
  28. "github.com/syncthing/syncthing/lib/osutil"
  29. "github.com/syncthing/syncthing/lib/protocol"
  30. "github.com/syncthing/syncthing/lib/scanner"
  31. "github.com/syncthing/syncthing/lib/semaphore"
  32. "github.com/syncthing/syncthing/lib/versioner"
  33. )
  34. var (
  35. blockStats = make(map[string]int)
  36. blockStatsMut sync.Mutex
  37. )
  38. func init() {
  39. folderFactories[config.FolderTypeSendReceive] = newSendReceiveFolder
  40. }
  41. // A pullBlockState is passed to the puller routine for each block that needs
  42. // to be fetched.
  43. type pullBlockState struct {
  44. *sharedPullerState
  45. block protocol.BlockInfo
  46. }
  47. // A copyBlocksState is passed to copy routine if the file has blocks to be
  48. // copied.
  49. type copyBlocksState struct {
  50. *sharedPullerState
  51. blocks []protocol.BlockInfo
  52. have int
  53. }
  54. // Which filemode bits to preserve
  55. const retainBits = fs.ModeSetgid | fs.ModeSetuid | fs.ModeSticky
  56. var (
  57. activity = newDeviceActivity()
  58. errNoDevice = errors.New("peers who had this file went away, or the file has changed while syncing. will retry later")
  59. errDirPrefix = "directory has been deleted on a remote device but "
  60. errDirHasToBeScanned = errors.New(errDirPrefix + "contains changed files, scheduling scan")
  61. errDirHasIgnored = errors.New(errDirPrefix + "contains ignored files (see ignore documentation for (?d) prefix)")
  62. errDirNotEmpty = errors.New(errDirPrefix + "is not empty; the contents are probably ignored on that remote device, but not locally")
  63. errNotAvailable = errors.New("no connected device has the required version of this file")
  64. errModified = errors.New("file modified but not rescanned; will try again later")
  65. errUnexpectedDirOnFileDel = errors.New("encountered directory when trying to remove file/symlink")
  66. errIncompatibleSymlink = errors.New("incompatible symlink entry; rescan with newer Syncthing on source")
  67. contextRemovingOldItem = "removing item to be replaced"
  68. )
  69. type dbUpdateType int
  70. func (d dbUpdateType) String() string {
  71. switch d {
  72. case dbUpdateHandleDir:
  73. return "dbUpdateHandleDir"
  74. case dbUpdateDeleteDir:
  75. return "dbUpdateDeleteDir"
  76. case dbUpdateHandleFile:
  77. return "dbUpdateHandleFile"
  78. case dbUpdateDeleteFile:
  79. return "dbUpdateDeleteFile"
  80. case dbUpdateShortcutFile:
  81. return "dbUpdateShortcutFile"
  82. case dbUpdateHandleSymlink:
  83. return "dbUpdateHandleSymlink"
  84. case dbUpdateInvalidate:
  85. return "dbUpdateHandleInvalidate"
  86. }
  87. panic(fmt.Sprintf("unknown dbUpdateType %d", d))
  88. }
  89. const (
  90. dbUpdateHandleDir dbUpdateType = iota
  91. dbUpdateDeleteDir
  92. dbUpdateHandleFile
  93. dbUpdateDeleteFile
  94. dbUpdateShortcutFile
  95. dbUpdateHandleSymlink
  96. dbUpdateInvalidate
  97. )
  98. const (
  99. defaultCopiers = 2
  100. defaultPullerPause = 60 * time.Second
  101. defaultPullerPendingKiB = 2 * protocol.MaxBlockSize / 1024
  102. maxPullerIterations = 3
  103. )
  104. type dbUpdateJob struct {
  105. file protocol.FileInfo
  106. jobType dbUpdateType
  107. }
  108. type sendReceiveFolder struct {
  109. *folder
  110. queue *jobQueue
  111. blockPullReorderer blockPullReorderer
  112. writeLimiter *semaphore.Semaphore
  113. tempPullErrors map[string]string // pull errors that might be just transient
  114. }
  115. func newSendReceiveFolder(model *model, ignores *ignore.Matcher, cfg config.FolderConfiguration, ver versioner.Versioner, evLogger events.Logger, ioLimiter *semaphore.Semaphore) service {
  116. f := &sendReceiveFolder{
  117. folder: newFolder(model, ignores, cfg, evLogger, ioLimiter, ver),
  118. queue: newJobQueue(),
  119. blockPullReorderer: newBlockPullReorderer(cfg.BlockPullOrder, model.id, cfg.DeviceIDs()),
  120. writeLimiter: semaphore.New(cfg.MaxConcurrentWrites),
  121. }
  122. f.puller = f
  123. if f.Copiers == 0 {
  124. f.Copiers = defaultCopiers
  125. }
  126. // If the configured max amount of pending data is zero, we use the
  127. // default. If it's configured to something non-zero but less than the
  128. // protocol block size we adjust it upwards accordingly.
  129. if f.PullerMaxPendingKiB == 0 {
  130. f.PullerMaxPendingKiB = defaultPullerPendingKiB
  131. }
  132. if blockSizeKiB := protocol.MaxBlockSize / 1024; f.PullerMaxPendingKiB < blockSizeKiB {
  133. f.PullerMaxPendingKiB = blockSizeKiB
  134. }
  135. return f
  136. }
  137. // pull returns true if it manages to get all needed items from peers, i.e. get
  138. // the device in sync with the global state.
  139. func (f *sendReceiveFolder) pull(ctx context.Context) (bool, error) {
  140. f.sl.DebugContext(ctx, "Pulling")
  141. scanChan := make(chan string)
  142. go f.pullScannerRoutine(ctx, scanChan)
  143. defer func() {
  144. close(scanChan)
  145. f.setState(FolderIdle)
  146. }()
  147. metricFolderPulls.WithLabelValues(f.ID).Inc()
  148. pullCtx, cancel := context.WithCancel(ctx)
  149. defer cancel()
  150. go addTimeUntilCancelled(pullCtx, metricFolderPullSeconds.WithLabelValues(f.ID))
  151. changed := 0
  152. f.errorsMut.Lock()
  153. f.pullErrors = nil
  154. f.errorsMut.Unlock()
  155. var err error
  156. for tries := range maxPullerIterations {
  157. select {
  158. case <-ctx.Done():
  159. return false, ctx.Err()
  160. default:
  161. }
  162. // Needs to be set on every loop, as the puller might have set
  163. // it to FolderSyncing during the last iteration.
  164. f.setState(FolderSyncPreparing)
  165. changed, err = f.pullerIteration(ctx, scanChan)
  166. if err != nil {
  167. return false, err
  168. }
  169. f.sl.DebugContext(ctx, "Pull iteration completed", "changed", changed, "try", tries+1)
  170. if changed == 0 {
  171. // No files were changed by the puller, so we are in sync, or we
  172. // are unable to make further progress for the moment.
  173. break
  174. }
  175. }
  176. f.errorsMut.Lock()
  177. pullErrNum := len(f.tempPullErrors)
  178. if pullErrNum > 0 {
  179. f.pullErrors = make([]FileError, 0, len(f.tempPullErrors))
  180. for path, err := range f.tempPullErrors {
  181. f.sl.WarnContext(ctx, "Failed to sync", slogutil.FilePath(path), slogutil.Error(err))
  182. f.pullErrors = append(f.pullErrors, FileError{
  183. Err: err,
  184. Path: path,
  185. })
  186. }
  187. f.tempPullErrors = nil
  188. }
  189. f.errorsMut.Unlock()
  190. if pullErrNum > 0 {
  191. f.evLogger.Log(events.FolderErrors, map[string]interface{}{
  192. "folder": f.folderID,
  193. "errors": f.Errors(),
  194. })
  195. }
  196. // We're done if we didn't change anything and didn't fail to change
  197. // anything
  198. return changed == 0 && pullErrNum == 0, nil
  199. }
  200. // pullerIteration runs a single puller iteration for the given folder and
  201. // returns the number items that should have been synced (even those that
  202. // might have failed). One puller iteration handles all files currently
  203. // flagged as needed in the folder.
  204. func (f *sendReceiveFolder) pullerIteration(ctx context.Context, scanChan chan<- string) (int, error) {
  205. f.errorsMut.Lock()
  206. f.tempPullErrors = make(map[string]string)
  207. f.errorsMut.Unlock()
  208. pullChan := make(chan pullBlockState)
  209. copyChan := make(chan copyBlocksState)
  210. finisherChan := make(chan *sharedPullerState)
  211. dbUpdateChan := make(chan dbUpdateJob)
  212. var pullWg sync.WaitGroup
  213. var copyWg sync.WaitGroup
  214. var doneWg sync.WaitGroup
  215. var updateWg sync.WaitGroup
  216. f.sl.DebugContext(ctx, "Starting puller iteration", "copiers", f.Copiers, "pullerPendingKiB", f.PullerMaxPendingKiB)
  217. updateWg.Add(1)
  218. var changed int // only read after updateWg closes
  219. go func() {
  220. // dbUpdaterRoutine finishes when dbUpdateChan is closed
  221. changed = f.dbUpdaterRoutine(dbUpdateChan)
  222. updateWg.Done()
  223. }()
  224. for range f.Copiers {
  225. copyWg.Add(1)
  226. go func() {
  227. // copierRoutine finishes when copyChan is closed
  228. f.copierRoutine(ctx, copyChan, pullChan, finisherChan)
  229. copyWg.Done()
  230. }()
  231. }
  232. pullWg.Add(1)
  233. go func() {
  234. // pullerRoutine finishes when pullChan is closed
  235. f.pullerRoutine(ctx, pullChan, finisherChan)
  236. pullWg.Done()
  237. }()
  238. doneWg.Add(1)
  239. // finisherRoutine finishes when finisherChan is closed
  240. go func() {
  241. f.finisherRoutine(ctx, finisherChan, dbUpdateChan, scanChan)
  242. doneWg.Done()
  243. }()
  244. fileDeletions, dirDeletions, err := f.processNeeded(ctx, dbUpdateChan, copyChan, scanChan)
  245. // Signal copy and puller routines that we are done with the in data for
  246. // this iteration. Wait for them to finish.
  247. close(copyChan)
  248. copyWg.Wait()
  249. close(pullChan)
  250. pullWg.Wait()
  251. // Signal the finisher chan that there will be no more input and wait
  252. // for it to finish.
  253. close(finisherChan)
  254. doneWg.Wait()
  255. if err == nil {
  256. f.processDeletions(ctx, fileDeletions, dirDeletions, dbUpdateChan, scanChan)
  257. }
  258. // Wait for db updates and scan scheduling to complete
  259. close(dbUpdateChan)
  260. updateWg.Wait()
  261. f.queue.Reset()
  262. return changed, err
  263. }
  264. func (f *sendReceiveFolder) processNeeded(ctx context.Context, dbUpdateChan chan<- dbUpdateJob, copyChan chan<- copyBlocksState, scanChan chan<- string) (map[string]protocol.FileInfo, []protocol.FileInfo, error) {
  265. var dirDeletions []protocol.FileInfo
  266. fileDeletions := map[string]protocol.FileInfo{}
  267. buckets := map[string][]protocol.FileInfo{}
  268. // Iterate the list of items that we need and sort them into piles.
  269. // Regular files to pull goes into the file queue, everything else
  270. // (directories, symlinks and deletes) goes into the "process directly"
  271. // pile.
  272. loop:
  273. for file, err := range itererr.Zip(f.model.sdb.AllNeededGlobalFiles(f.folderID, protocol.LocalDeviceID, f.Order, 0, 0)) {
  274. if err != nil {
  275. return nil, nil, err
  276. }
  277. select {
  278. case <-ctx.Done():
  279. break loop
  280. default:
  281. }
  282. if f.IgnoreDelete && file.IsDeleted() {
  283. f.sl.DebugContext(ctx, "Ignoring file deletion per config", slogutil.FilePath(file.FileName()))
  284. continue
  285. }
  286. switch {
  287. case f.ignores.Match(file.Name).IsIgnored():
  288. file.SetIgnored()
  289. f.sl.DebugContext(ctx, "Handling ignored file", file.LogAttr())
  290. dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
  291. case build.IsWindows && fs.WindowsInvalidFilename(file.Name) != nil:
  292. if file.IsDeleted() {
  293. // Just pretend we deleted it, no reason to create an error
  294. // about a deleted file that we can't have anyway.
  295. // Reason we need it in the first place is, that it was
  296. // ignored at some point.
  297. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  298. } else {
  299. // We can't pull an invalid file. Grab the error again since
  300. // we couldn't assign it directly in the case clause.
  301. f.newPullError(file.Name, fs.WindowsInvalidFilename(file.Name))
  302. }
  303. case file.IsDeleted():
  304. switch {
  305. case file.IsDirectory():
  306. // Perform directory deletions at the end, as we may have
  307. // files to delete inside them before we get to that point.
  308. dirDeletions = append(dirDeletions, file)
  309. case file.IsSymlink():
  310. f.deleteFile(file, dbUpdateChan, scanChan)
  311. default:
  312. df, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  313. if err != nil {
  314. return nil, nil, err
  315. }
  316. // Local file can be already deleted, but with a lower version
  317. // number, hence the deletion coming in again as part of
  318. // WithNeed, furthermore, the file can simply be of the wrong
  319. // type if we haven't yet managed to pull it.
  320. if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() && !df.IsInvalid() {
  321. fileDeletions[file.Name] = file
  322. // Put files into buckets per first hash
  323. key := string(df.BlocksHash)
  324. buckets[key] = append(buckets[key], df)
  325. } else {
  326. f.deleteFileWithCurrent(file, df, ok, dbUpdateChan, scanChan)
  327. }
  328. }
  329. case file.Type == protocol.FileInfoTypeFile:
  330. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  331. if err != nil {
  332. return nil, nil, err
  333. }
  334. if hasCurFile && file.BlocksEqual(curFile) {
  335. // We are supposed to copy the entire file, and then fetch nothing. We
  336. // are only updating metadata, so we don't actually *need* to make the
  337. // copy.
  338. f.shortcutFile(file, dbUpdateChan)
  339. } else {
  340. // Queue files for processing after directories and symlinks.
  341. f.queue.Push(file.Name, file.Size, file.ModTime())
  342. }
  343. case (build.IsWindows || build.IsAndroid) && file.IsSymlink():
  344. if err := f.handleSymlinkCheckExisting(file, scanChan); err != nil {
  345. f.newPullError(file.Name, fmt.Errorf("handling unsupported symlink: %w", err))
  346. break
  347. }
  348. file.SetUnsupported()
  349. f.sl.DebugContext(ctx, "Invalidating unsupported symlink", slogutil.FilePath(file.Name))
  350. dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
  351. case file.IsDirectory() && !file.IsSymlink():
  352. f.sl.DebugContext(ctx, "Handling directory", slogutil.FilePath(file.Name))
  353. if f.checkParent(file.Name, scanChan) {
  354. f.handleDir(file, dbUpdateChan, scanChan)
  355. }
  356. case file.IsSymlink():
  357. f.sl.DebugContext(ctx, "Handling symlink", slogutil.FilePath(file.Name))
  358. if f.checkParent(file.Name, scanChan) {
  359. f.handleSymlink(file, dbUpdateChan, scanChan)
  360. }
  361. default:
  362. panic("unhandleable item type, can't happen")
  363. }
  364. }
  365. select {
  366. case <-ctx.Done():
  367. return nil, nil, ctx.Err()
  368. default:
  369. }
  370. // Process the file queue.
  371. nextFile:
  372. for {
  373. select {
  374. case <-ctx.Done():
  375. return fileDeletions, dirDeletions, ctx.Err()
  376. default:
  377. }
  378. fileName, ok := f.queue.Pop()
  379. if !ok {
  380. break
  381. }
  382. fi, ok, err := f.model.sdb.GetGlobalFile(f.folderID, fileName)
  383. if err != nil {
  384. return nil, nil, err
  385. }
  386. if !ok {
  387. // File is no longer in the index. Mark it as done and drop it.
  388. f.queue.Done(fileName)
  389. continue
  390. }
  391. if fi.IsDeleted() || fi.IsInvalid() || fi.Type != protocol.FileInfoTypeFile {
  392. // The item has changed type or status in the index while we
  393. // were processing directories above.
  394. f.queue.Done(fileName)
  395. continue
  396. }
  397. if !f.checkParent(fi.Name, scanChan) {
  398. f.queue.Done(fileName)
  399. continue
  400. }
  401. // Check our list of files to be removed for a match, in which case
  402. // we can just do a rename instead.
  403. key := string(fi.BlocksHash)
  404. for candidate, ok := popCandidate(buckets, key); ok; candidate, ok = popCandidate(buckets, key) {
  405. // candidate is our current state of the file, where as the
  406. // desired state with the delete bit set is in the deletion
  407. // map.
  408. desired := fileDeletions[candidate.Name]
  409. if err := f.renameFile(candidate, desired, fi, dbUpdateChan, scanChan); err != nil {
  410. f.sl.DebugContext(ctx, "Rename shortcut failed", slogutil.FilePath(fi.Name), slogutil.Error(err))
  411. // Failed to rename, try next one.
  412. continue
  413. }
  414. // Remove the pending deletion (as we performed it by renaming)
  415. delete(fileDeletions, candidate.Name)
  416. f.queue.Done(fileName)
  417. continue nextFile
  418. }
  419. // Verify there is some availability for the file before we start
  420. // processing it
  421. devices := f.model.fileAvailability(f.FolderConfiguration, fi)
  422. if len(devices) == 0 {
  423. f.newPullError(fileName, errNotAvailable)
  424. f.queue.Done(fileName)
  425. continue
  426. }
  427. // Verify we have space to handle the file before we start
  428. // creating temp files etc.
  429. if err := f.CheckAvailableSpace(uint64(fi.Size)); err != nil { //nolint:gosec
  430. f.newPullError(fileName, err)
  431. f.queue.Done(fileName)
  432. continue
  433. }
  434. if err := f.handleFile(ctx, fi, copyChan); err != nil {
  435. f.newPullError(fileName, err)
  436. }
  437. }
  438. return fileDeletions, dirDeletions, nil
  439. }
  440. func popCandidate(buckets map[string][]protocol.FileInfo, key string) (protocol.FileInfo, bool) {
  441. cands := buckets[key]
  442. if len(cands) == 0 {
  443. return protocol.FileInfo{}, false
  444. }
  445. buckets[key] = cands[1:]
  446. return cands[0], true
  447. }
  448. func (f *sendReceiveFolder) processDeletions(ctx context.Context, fileDeletions map[string]protocol.FileInfo, dirDeletions []protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  449. for _, file := range fileDeletions {
  450. select {
  451. case <-ctx.Done():
  452. return
  453. default:
  454. }
  455. f.deleteFile(file, dbUpdateChan, scanChan)
  456. }
  457. // Process in reverse order to delete depth first
  458. for i := range dirDeletions {
  459. select {
  460. case <-ctx.Done():
  461. return
  462. default:
  463. }
  464. dir := dirDeletions[len(dirDeletions)-i-1]
  465. f.sl.DebugContext(ctx, "Deleting directory", slogutil.FilePath(dir.Name))
  466. f.deleteDir(dir, dbUpdateChan, scanChan)
  467. }
  468. }
  469. // handleDir creates or updates the given directory
  470. func (f *sendReceiveFolder) handleDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  471. // Used in the defer closure below, updated by the function body. Take
  472. // care not declare another err.
  473. var err error
  474. f.evLogger.Log(events.ItemStarted, map[string]string{
  475. "folder": f.folderID,
  476. "item": file.Name,
  477. "type": "dir",
  478. "action": "update",
  479. })
  480. defer func() {
  481. slog.Info("Created or updated directory", f.LogAttr(), file.LogAttr())
  482. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  483. "folder": f.folderID,
  484. "item": file.Name,
  485. "error": events.Error(err),
  486. "type": "dir",
  487. "action": "update",
  488. })
  489. }()
  490. mode := fs.FileMode(file.Permissions & 0o777)
  491. if f.IgnorePerms || file.NoPermissions {
  492. mode = 0o777
  493. }
  494. f.sl.Debug("Need dir", "file", file, "cur", slogutil.Expensive(func() any {
  495. curFile, _, _ := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  496. return curFile
  497. }))
  498. info, err := f.mtimefs.Lstat(file.Name)
  499. switch {
  500. // There is already something under that name, we need to handle that.
  501. // Unless it already is a directory, as we only track permissions,
  502. // that don't result in a conflict.
  503. case err == nil && !info.IsDir():
  504. // Check that it is what we have in the database.
  505. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  506. if err != nil {
  507. f.newPullError(file.Name, fmt.Errorf("handling dir: %w", err))
  508. return
  509. }
  510. if err := f.scanIfItemChanged(file.Name, info, curFile, hasCurFile, false, scanChan); err != nil {
  511. f.newPullError(file.Name, fmt.Errorf("handling dir: %w", err))
  512. return
  513. }
  514. // Remove it to replace with the dir.
  515. if !curFile.IsSymlink() && file.InConflictWith(curFile) {
  516. // The new file has been changed in conflict with the existing one. We
  517. // should file it away as a conflict instead of just removing or
  518. // archiving.
  519. // Symlinks aren't checked for conflicts.
  520. err = f.inWritableDir(func(name string) error {
  521. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  522. }, curFile.Name)
  523. } else {
  524. err = f.deleteItemOnDisk(curFile, scanChan)
  525. }
  526. if err != nil {
  527. f.newPullError(file.Name, err)
  528. return
  529. }
  530. fallthrough
  531. // The directory doesn't exist, so we create it with the right
  532. // mode bits from the start.
  533. case err != nil && fs.IsNotExist(err):
  534. // We declare a function that acts on only the path name, so
  535. // we can pass it to InWritableDir. We use a regular Mkdir and
  536. // not MkdirAll because the parent should already exist.
  537. mkdir := func(path string) error {
  538. err = f.mtimefs.Mkdir(path, mode)
  539. if err != nil {
  540. return err
  541. }
  542. // Set the platform data (ownership, xattrs, etc).
  543. if err := f.setPlatformData(&file, path); err != nil {
  544. return err
  545. }
  546. if f.IgnorePerms || file.NoPermissions {
  547. return nil
  548. }
  549. // Stat the directory so we can check its permissions.
  550. info, err := f.mtimefs.Lstat(path)
  551. if err != nil {
  552. return err
  553. }
  554. // Mask for the bits we want to preserve and add them in to the
  555. // directories permissions.
  556. return f.mtimefs.Chmod(path, mode|(info.Mode()&retainBits))
  557. }
  558. if err = f.inWritableDir(mkdir, file.Name); err == nil {
  559. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
  560. } else {
  561. f.newPullError(file.Name, fmt.Errorf("creating directory: %w", err))
  562. }
  563. return
  564. // Weird error when stat()'ing the dir. Probably won't work to do
  565. // anything else with it if we can't even stat() it.
  566. case err != nil:
  567. f.newPullError(file.Name, fmt.Errorf("checking file to be replaced: %w", err))
  568. return
  569. }
  570. // The directory already exists, so we just correct the metadata. (We
  571. // don't handle modification times on directories, because that sucks...)
  572. // It's OK to change mode bits on stuff within non-writable directories.
  573. if !f.IgnorePerms && !file.NoPermissions {
  574. if err := f.mtimefs.Chmod(file.Name, mode|(info.Mode()&retainBits)); err != nil {
  575. f.newPullError(file.Name, fmt.Errorf("handling dir (setting permissions): %w", err))
  576. return
  577. }
  578. if err := f.setPlatformData(&file, file.Name); err != nil {
  579. f.newPullError(file.Name, fmt.Errorf("handling dir (setting metadata): %w", err))
  580. return
  581. }
  582. }
  583. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleDir}
  584. }
  585. // checkParent verifies that the thing we are handling lives inside a directory,
  586. // and not a symlink or regular file. It also resurrects missing parent dirs.
  587. func (f *sendReceiveFolder) checkParent(file string, scanChan chan<- string) bool {
  588. parent := filepath.Dir(file)
  589. if err := osutil.TraversesSymlink(f.mtimefs, parent); err != nil {
  590. f.newPullError(file, fmt.Errorf("checking parent dirs: %w", err))
  591. return false
  592. }
  593. // issues #114 and #4475: This works around a race condition
  594. // between two devices, when one device removes a directory and the
  595. // other creates a file in it. However that happens, we end up with
  596. // a directory for "foo" with the delete bit, but a file "foo/bar"
  597. // that we want to sync. We never create the directory, and hence
  598. // fail to create the file and end up looping forever on it. This
  599. // breaks that by creating the directory and scheduling a scan,
  600. // where it will be found and the delete bit on it removed. The
  601. // user can then clean up as they like...
  602. // This can also occur if an entire tree structure was deleted, but only
  603. // a leave has been scanned.
  604. //
  605. // And if this is an encrypted folder:
  606. // Encrypted files have made-up filenames with two synthetic parent
  607. // directories which don't have any meaning. Create those if necessary.
  608. if _, err := f.mtimefs.Lstat(parent); !fs.IsNotExist(err) {
  609. f.sl.Debug("Parent directory exists", slogutil.FilePath(file))
  610. return true
  611. }
  612. f.sl.Debug("Creating parent directory", slogutil.FilePath(file))
  613. if err := f.mtimefs.MkdirAll(parent, 0o755); err != nil {
  614. f.newPullError(file, fmt.Errorf("creating parent dir: %w", err))
  615. return false
  616. }
  617. if f.Type != config.FolderTypeReceiveEncrypted {
  618. scanChan <- parent
  619. }
  620. return true
  621. }
  622. // handleSymlink creates or updates the given symlink
  623. func (f *sendReceiveFolder) handleSymlink(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  624. // Used in the defer closure below, updated by the function body. Take
  625. // care not declare another err.
  626. var err error
  627. f.evLogger.Log(events.ItemStarted, map[string]string{
  628. "folder": f.folderID,
  629. "item": file.Name,
  630. "type": "symlink",
  631. "action": "update",
  632. })
  633. defer func() {
  634. if err != nil {
  635. slog.Warn("Failed to handle symlink", f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  636. } else {
  637. slog.Info("Created or updated symlink", f.LogAttr(), file.LogAttr())
  638. }
  639. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  640. "folder": f.folderID,
  641. "item": file.Name,
  642. "error": events.Error(err),
  643. "type": "symlink",
  644. "action": "update",
  645. })
  646. }()
  647. f.sl.Debug("Need symlink", slogutil.FilePath(file.Name), slog.Any("cur", slogutil.Expensive(func() any {
  648. curFile, _, _ := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  649. return curFile
  650. })))
  651. if len(file.SymlinkTarget) == 0 {
  652. // Index entry from a Syncthing predating the support for including
  653. // the link target in the index entry. We log this as an error.
  654. f.newPullError(file.Name, errIncompatibleSymlink)
  655. return
  656. }
  657. if err = f.handleSymlinkCheckExisting(file, scanChan); err != nil {
  658. f.newPullError(file.Name, fmt.Errorf("handling symlink: %w", err))
  659. return
  660. }
  661. // We declare a function that acts on only the path name, so
  662. // we can pass it to InWritableDir.
  663. createLink := func(path string) error {
  664. if err := f.mtimefs.CreateSymlink(string(file.SymlinkTarget), path); err != nil {
  665. return err
  666. }
  667. return f.setPlatformData(&file, path)
  668. }
  669. if err = f.inWritableDir(createLink, file.Name); err == nil {
  670. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleSymlink}
  671. } else {
  672. f.newPullError(file.Name, fmt.Errorf("symlink create: %w", err))
  673. }
  674. }
  675. func (f *sendReceiveFolder) handleSymlinkCheckExisting(file protocol.FileInfo, scanChan chan<- string) error {
  676. // If there is already something under that name, we need to handle that.
  677. info, err := f.mtimefs.Lstat(file.Name)
  678. if err != nil {
  679. if fs.IsNotExist(err) {
  680. return nil
  681. }
  682. return err
  683. }
  684. // Check that it is what we have in the database.
  685. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  686. if err != nil {
  687. return err
  688. }
  689. if err := f.scanIfItemChanged(file.Name, info, curFile, hasCurFile, false, scanChan); err != nil {
  690. return err
  691. }
  692. // Remove it to replace with the symlink. This also handles the
  693. // "change symlink type" path.
  694. if !curFile.IsDirectory() && !curFile.IsSymlink() && file.InConflictWith(curFile) {
  695. // The new file has been changed in conflict with the existing one. We
  696. // should file it away as a conflict instead of just removing or
  697. // archiving.
  698. // Directories and symlinks aren't checked for conflicts.
  699. return f.inWritableDir(func(name string) error {
  700. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  701. }, curFile.Name)
  702. } else {
  703. return f.deleteItemOnDisk(curFile, scanChan)
  704. }
  705. }
  706. // deleteDir attempts to remove a directory that was deleted on a remote
  707. func (f *sendReceiveFolder) deleteDir(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  708. // Used in the defer closure below, updated by the function body. Take
  709. // care not declare another err.
  710. var err error
  711. f.evLogger.Log(events.ItemStarted, map[string]string{
  712. "folder": f.folderID,
  713. "item": file.Name,
  714. "type": "dir",
  715. "action": "delete",
  716. })
  717. defer func() {
  718. if err != nil {
  719. f.newPullError(file.Name, fmt.Errorf("delete dir: %w", err))
  720. slog.Info("Failed to delete directory", f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  721. } else {
  722. slog.Info("Deleted directory", f.LogAttr(), file.LogAttr())
  723. }
  724. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  725. "folder": f.folderID,
  726. "item": file.Name,
  727. "error": events.Error(err),
  728. "type": "dir",
  729. "action": "delete",
  730. })
  731. }()
  732. cur, hasCur, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  733. if err != nil {
  734. return
  735. }
  736. if err = f.checkToBeDeleted(file, cur, hasCur, scanChan); err != nil {
  737. if fs.IsNotExist(err) || fs.IsErrCaseConflict(err) {
  738. err = nil
  739. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteDir}
  740. }
  741. return
  742. }
  743. if err = f.deleteDirOnDisk(file.Name, scanChan); err != nil {
  744. return
  745. }
  746. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteDir}
  747. }
  748. // deleteFile attempts to delete the given file
  749. func (f *sendReceiveFolder) deleteFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  750. cur, hasCur, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  751. if err != nil {
  752. f.newPullError(file.Name, fmt.Errorf("delete file: %w", err))
  753. return
  754. }
  755. f.deleteFileWithCurrent(file, cur, hasCur, dbUpdateChan, scanChan)
  756. }
  757. func (f *sendReceiveFolder) deleteFileWithCurrent(file, cur protocol.FileInfo, hasCur bool, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  758. // Used in the defer closure below, updated by the function body. Take
  759. // care not declare another err.
  760. var err error
  761. f.sl.Debug("Deleting file or symlink", slogutil.FilePath(file.Name))
  762. f.evLogger.Log(events.ItemStarted, map[string]string{
  763. "folder": f.folderID,
  764. "item": file.Name,
  765. "type": "file",
  766. "action": "delete",
  767. })
  768. defer func() {
  769. kind := "file"
  770. if file.IsSymlink() {
  771. kind = "symlink"
  772. }
  773. if err != nil {
  774. f.newPullError(file.Name, fmt.Errorf("delete file: %w", err))
  775. slog.Info("Failed to delete "+kind, f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  776. } else {
  777. slog.Info("Deleted "+kind, f.LogAttr(), file.LogAttr())
  778. }
  779. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  780. "folder": f.folderID,
  781. "item": file.Name,
  782. "error": events.Error(err),
  783. "type": "file",
  784. "action": "delete",
  785. })
  786. }()
  787. if err = f.checkToBeDeleted(file, cur, hasCur, scanChan); err != nil {
  788. if fs.IsNotExist(err) || fs.IsErrCaseConflict(err) {
  789. err = nil
  790. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  791. }
  792. return
  793. }
  794. // We are asked to delete a file, but what we have on disk and in db
  795. // is a directory. Something is wrong here, should probably not happen.
  796. if cur.IsDirectory() {
  797. err = errUnexpectedDirOnFileDel
  798. return
  799. }
  800. switch {
  801. case file.InConflictWith(cur) && !cur.IsSymlink():
  802. // If the delete constitutes winning a conflict, we move the file to
  803. // a conflict copy instead of doing the delete
  804. err = f.inWritableDir(func(name string) error {
  805. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  806. }, cur.Name)
  807. case f.versioner != nil && !cur.IsSymlink():
  808. // If we have a versioner, use that to move the file away
  809. err = f.inWritableDir(f.versioner.Archive, file.Name)
  810. default:
  811. // Delete the file
  812. err = f.inWritableDir(f.mtimefs.Remove, file.Name)
  813. }
  814. if err == nil || fs.IsNotExist(err) {
  815. // It was removed or it doesn't exist to start with
  816. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  817. return
  818. }
  819. if _, serr := f.mtimefs.Lstat(file.Name); serr != nil && !fs.IsPermission(serr) {
  820. // We get an error just looking at the file, and it's not a permission
  821. // problem. Lets assume the error is in fact some variant of "file
  822. // does not exist" (possibly expressed as some parent being a file and
  823. // not a directory etc) and that the delete is handled.
  824. err = nil
  825. dbUpdateChan <- dbUpdateJob{file, dbUpdateDeleteFile}
  826. }
  827. slog.Info("Deleted file", f.LogAttr(), file.LogAttr())
  828. }
  829. // renameFile attempts to rename an existing file to a destination
  830. // and set the right attributes on it.
  831. func (f *sendReceiveFolder) renameFile(cur, source, target protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
  832. // Used in the defer closure below, updated by the function body. Take
  833. // care not declare another err.
  834. var err error
  835. f.evLogger.Log(events.ItemStarted, map[string]string{
  836. "folder": f.folderID,
  837. "item": source.Name,
  838. "type": "file",
  839. "action": "delete",
  840. })
  841. f.evLogger.Log(events.ItemStarted, map[string]string{
  842. "folder": f.folderID,
  843. "item": target.Name,
  844. "type": "file",
  845. "action": "update",
  846. })
  847. defer func() {
  848. if err != nil {
  849. slog.Info("Failed to rename file", f.LogAttr(), target.LogAttr(), slog.String("from", source.Name), slogutil.Error(err))
  850. } else {
  851. slog.Info("Renamed file", f.LogAttr(), target.LogAttr(), slog.String("from", source.Name))
  852. }
  853. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  854. "folder": f.folderID,
  855. "item": source.Name,
  856. "error": events.Error(err),
  857. "type": "file",
  858. "action": "delete",
  859. })
  860. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  861. "folder": f.folderID,
  862. "item": target.Name,
  863. "error": events.Error(err),
  864. "type": "file",
  865. "action": "update",
  866. })
  867. }()
  868. f.sl.Debug("Taking rename shortcut", "from", source.Name, "to", target.Name)
  869. // Check that source is compatible with what we have in the DB
  870. if err = f.checkToBeDeleted(source, cur, true, scanChan); err != nil {
  871. return err
  872. }
  873. // Check that the target corresponds to what we have in the DB
  874. curTarget, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, target.Name)
  875. if err != nil {
  876. return err
  877. }
  878. switch stat, serr := f.mtimefs.Lstat(target.Name); {
  879. case serr != nil:
  880. var caseErr *fs.CaseConflictError
  881. switch {
  882. case errors.As(serr, &caseErr):
  883. if caseErr.Real != source.Name {
  884. err = serr
  885. break
  886. }
  887. fallthrough // This is a case only rename
  888. case fs.IsNotExist(serr):
  889. if !ok || curTarget.IsDeleted() {
  890. break
  891. }
  892. scanChan <- target.Name
  893. err = errModified
  894. default:
  895. // We can't check whether the file changed as compared to the db,
  896. // do not delete.
  897. err = serr
  898. }
  899. case !ok:
  900. // Target appeared from nowhere
  901. scanChan <- target.Name
  902. err = errModified
  903. default:
  904. var fi protocol.FileInfo
  905. if fi, err = scanner.CreateFileInfo(stat, target.Name, f.mtimefs, f.SyncOwnership, f.SyncXattrs, f.XattrFilter); err == nil {
  906. if !fi.IsEquivalentOptional(curTarget, protocol.FileInfoComparison{
  907. ModTimeWindow: f.modTimeWindow,
  908. IgnorePerms: f.IgnorePerms,
  909. IgnoreBlocks: true,
  910. IgnoreFlags: protocol.LocalAllFlags,
  911. IgnoreOwnership: !f.SyncOwnership,
  912. IgnoreXattrs: !f.SyncXattrs,
  913. }) {
  914. // Target changed
  915. scanChan <- target.Name
  916. err = errModified
  917. }
  918. }
  919. }
  920. if err != nil {
  921. return err
  922. }
  923. tempName := fs.TempName(target.Name)
  924. if f.versioner != nil {
  925. err = f.CheckAvailableSpace(uint64(source.Size)) //nolint:gosec
  926. if err == nil {
  927. err = osutil.Copy(f.CopyRangeMethod.ToFS(), f.mtimefs, f.mtimefs, source.Name, tempName)
  928. if err == nil {
  929. err = f.inWritableDir(f.versioner.Archive, source.Name)
  930. }
  931. }
  932. } else {
  933. err = osutil.RenameOrCopy(f.CopyRangeMethod.ToFS(), f.mtimefs, f.mtimefs, source.Name, tempName)
  934. }
  935. if err != nil {
  936. return err
  937. }
  938. blockStatsMut.Lock()
  939. minBlocksPerBlock := target.BlockSize() / protocol.MinBlockSize
  940. blockStats["total"] += len(target.Blocks) * minBlocksPerBlock
  941. blockStats["renamed"] += len(target.Blocks) * minBlocksPerBlock
  942. blockStatsMut.Unlock()
  943. // The file was renamed, so we have handled both the necessary delete
  944. // of the source and the creation of the target temp file. Fix-up the metadata,
  945. // update the local index of the target file and rename from temp to real name.
  946. if err = f.performFinish(target, curTarget, true, tempName, dbUpdateChan, scanChan); err != nil {
  947. return err
  948. }
  949. dbUpdateChan <- dbUpdateJob{source, dbUpdateDeleteFile}
  950. return nil
  951. }
  952. // This is the flow of data and events here, I think...
  953. //
  954. // +-----------------------+
  955. // | | - - - - > ItemStarted
  956. // | handleFile | - - - - > ItemFinished (on shortcuts)
  957. // | |
  958. // +-----------------------+
  959. // |
  960. // | copyChan (copyBlocksState; unless shortcut taken)
  961. // |
  962. // | +-----------------------+
  963. // | | +-----------------------+
  964. // +--->| | |
  965. // | | copierRoutine |
  966. // +-| |
  967. // +-----------------------+
  968. // |
  969. // | pullChan (sharedPullerState)
  970. // |
  971. // | +-----------------------+
  972. // | | +-----------------------+
  973. // +-->| | |
  974. // | | pullerRoutine |
  975. // +-| |
  976. // +-----------------------+
  977. // |
  978. // | finisherChan (sharedPullerState)
  979. // |
  980. // | +-----------------------+
  981. // | | |
  982. // +-->| finisherRoutine | - - - - > ItemFinished
  983. // | |
  984. // +-----------------------+
  985. // handleFile queues the copies and pulls as necessary for a single new or
  986. // changed file.
  987. func (f *sendReceiveFolder) handleFile(ctx context.Context, file protocol.FileInfo, copyChan chan<- copyBlocksState) error {
  988. curFile, hasCurFile, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, file.Name)
  989. if err != nil {
  990. return err
  991. }
  992. have, _ := blockDiff(curFile.Blocks, file.Blocks)
  993. tempName := fs.TempName(file.Name)
  994. populateOffsets(file.Blocks)
  995. blocks := append([]protocol.BlockInfo{}, file.Blocks...)
  996. reused := make([]int, 0, len(file.Blocks))
  997. if f.Type != config.FolderTypeReceiveEncrypted {
  998. blocks, reused = f.reuseBlocks(ctx, blocks, reused, file, tempName)
  999. }
  1000. // The sharedpullerstate will know which flags to use when opening the
  1001. // temp file depending if we are reusing any blocks or not.
  1002. if len(reused) == 0 {
  1003. // Otherwise, discard the file ourselves in order for the
  1004. // sharedpuller not to panic when it fails to exclusively create a
  1005. // file which already exists
  1006. f.inWritableDir(f.mtimefs.Remove, tempName)
  1007. }
  1008. // Reorder blocks
  1009. blocks = f.blockPullReorderer.Reorder(blocks)
  1010. f.evLogger.Log(events.ItemStarted, map[string]string{
  1011. "folder": f.folderID,
  1012. "item": file.Name,
  1013. "type": "file",
  1014. "action": "update",
  1015. })
  1016. s := newSharedPullerState(file, f.mtimefs, f.folderID, tempName, blocks, reused, f.IgnorePerms || file.NoPermissions, hasCurFile, curFile, !f.DisableSparseFiles, !f.DisableFsync)
  1017. f.sl.DebugContext(ctx, "Handling file", slogutil.FilePath(file.Name), "blocksToCopy", len(blocks), "reused", len(reused))
  1018. cs := copyBlocksState{
  1019. sharedPullerState: s,
  1020. blocks: blocks,
  1021. have: len(have),
  1022. }
  1023. copyChan <- cs
  1024. return nil
  1025. }
  1026. func (f *sendReceiveFolder) reuseBlocks(ctx context.Context, blocks []protocol.BlockInfo, reused []int, file protocol.FileInfo, tempName string) ([]protocol.BlockInfo, []int) {
  1027. // Check for an old temporary file which might have some blocks we could
  1028. // reuse.
  1029. tempBlocks, err := scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
  1030. if err != nil {
  1031. var caseErr *fs.CaseConflictError
  1032. if errors.As(err, &caseErr) {
  1033. if rerr := f.mtimefs.Rename(caseErr.Real, tempName); rerr == nil {
  1034. tempBlocks, err = scanner.HashFile(ctx, f.ID, f.mtimefs, tempName, file.BlockSize(), nil)
  1035. }
  1036. }
  1037. }
  1038. if err != nil {
  1039. return blocks, reused
  1040. }
  1041. // Check for any reusable blocks in the temp file
  1042. tempCopyBlocks, _ := blockDiff(tempBlocks, file.Blocks)
  1043. // block.String() returns a string unique to the block
  1044. existingBlocks := make(map[string]struct{}, len(tempCopyBlocks))
  1045. for _, block := range tempCopyBlocks {
  1046. existingBlocks[block.String()] = struct{}{}
  1047. }
  1048. // Since the blocks are already there, we don't need to get them.
  1049. blocks = blocks[:0]
  1050. for i, block := range file.Blocks {
  1051. _, ok := existingBlocks[block.String()]
  1052. if !ok {
  1053. blocks = append(blocks, block)
  1054. } else {
  1055. reused = append(reused, i)
  1056. }
  1057. }
  1058. return blocks, reused
  1059. }
  1060. // blockDiff returns lists of common and missing (to transform src into tgt)
  1061. // blocks. Both block lists must have been created with the same block size.
  1062. func blockDiff(src, tgt []protocol.BlockInfo) ([]protocol.BlockInfo, []protocol.BlockInfo) {
  1063. if len(tgt) == 0 {
  1064. return nil, nil
  1065. }
  1066. if len(src) == 0 {
  1067. // Copy the entire file
  1068. return nil, tgt
  1069. }
  1070. have := make([]protocol.BlockInfo, 0, len(src))
  1071. need := make([]protocol.BlockInfo, 0, len(tgt))
  1072. for i := range tgt {
  1073. if i >= len(src) {
  1074. return have, append(need, tgt[i:]...)
  1075. }
  1076. if !bytes.Equal(tgt[i].Hash, src[i].Hash) {
  1077. // Copy differing block
  1078. need = append(need, tgt[i])
  1079. } else {
  1080. have = append(have, tgt[i])
  1081. }
  1082. }
  1083. return have, need
  1084. }
  1085. // populateOffsets sets the Offset field on each block
  1086. func populateOffsets(blocks []protocol.BlockInfo) {
  1087. var offset int64
  1088. for i := range blocks {
  1089. blocks[i].Offset = offset
  1090. offset += int64(blocks[i].Size)
  1091. }
  1092. }
  1093. // shortcutFile sets file metadata, when that's the only thing that has
  1094. // changed.
  1095. func (f *sendReceiveFolder) shortcutFile(file protocol.FileInfo, dbUpdateChan chan<- dbUpdateJob) {
  1096. f.sl.Debug("Taking metadata shortcut", slogutil.FilePath(file.Name))
  1097. f.evLogger.Log(events.ItemStarted, map[string]string{
  1098. "folder": f.folderID,
  1099. "item": file.Name,
  1100. "type": "file",
  1101. "action": "metadata",
  1102. })
  1103. var err error
  1104. defer func() {
  1105. if err != nil {
  1106. slog.Info("Failed to update file metadata", f.LogAttr(), file.LogAttr(), slogutil.Error(err))
  1107. } else {
  1108. slog.Info("Updated file metadata", f.LogAttr(), file.LogAttr())
  1109. }
  1110. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  1111. "folder": f.folderID,
  1112. "item": file.Name,
  1113. "error": events.Error(err),
  1114. "type": "file",
  1115. "action": "metadata",
  1116. })
  1117. }()
  1118. f.queue.Done(file.Name)
  1119. if !f.IgnorePerms && !file.NoPermissions {
  1120. if err = f.mtimefs.Chmod(file.Name, fs.FileMode(file.Permissions&0o777)); err != nil {
  1121. f.newPullError(file.Name, fmt.Errorf("shortcut file (setting permissions): %w", err))
  1122. return
  1123. }
  1124. }
  1125. if err := f.setPlatformData(&file, file.Name); err != nil {
  1126. f.newPullError(file.Name, fmt.Errorf("shortcut file (setting metadata): %w", err))
  1127. return
  1128. }
  1129. // Still need to re-write the trailer with the new encrypted fileinfo.
  1130. if f.Type == config.FolderTypeReceiveEncrypted {
  1131. err = inWritableDir(func(path string) error {
  1132. fd, err := f.mtimefs.OpenFile(path, fs.OptReadWrite, 0o666)
  1133. if err != nil {
  1134. return err
  1135. }
  1136. defer fd.Close()
  1137. trailerSize, err := writeEncryptionTrailer(file, fd)
  1138. if err != nil {
  1139. return err
  1140. }
  1141. file.EncryptionTrailerSize = int(trailerSize)
  1142. file.Size += trailerSize
  1143. return fd.Truncate(file.Size)
  1144. }, f.mtimefs, file.Name, true)
  1145. if err != nil {
  1146. f.newPullError(file.Name, fmt.Errorf("writing encrypted file trailer: %w", err))
  1147. return
  1148. }
  1149. }
  1150. f.mtimefs.Chtimes(file.Name, file.ModTime(), file.ModTime()) // never fails
  1151. dbUpdateChan <- dbUpdateJob{file, dbUpdateShortcutFile}
  1152. }
  1153. // copierRoutine reads copierStates until the in channel closes and performs
  1154. // the relevant copies when possible, or passes it to the puller routine.
  1155. func (f *sendReceiveFolder) copierRoutine(ctx context.Context, in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
  1156. otherFolderFilesystems := make(map[string]fs.Filesystem)
  1157. for folder, cfg := range f.model.cfg.Folders() {
  1158. if folder == f.ID {
  1159. continue
  1160. }
  1161. otherFolderFilesystems[folder] = cfg.Filesystem()
  1162. }
  1163. for state := range in {
  1164. if f.Type != config.FolderTypeReceiveEncrypted {
  1165. f.model.progressEmitter.Register(state.sharedPullerState)
  1166. }
  1167. f.setState(FolderSyncing) // Does nothing if already FolderSyncing
  1168. blocks:
  1169. for _, block := range state.blocks {
  1170. select {
  1171. case <-ctx.Done():
  1172. state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
  1173. break blocks
  1174. default:
  1175. }
  1176. if !f.DisableSparseFiles && state.reused == 0 && block.IsEmpty() {
  1177. // The block is a block of all zeroes, and we are not reusing
  1178. // a temp file, so there is no need to do anything with it.
  1179. // If we were reusing a temp file and had this block to copy,
  1180. // it would be because the block in the temp file was *not* a
  1181. // block of all zeroes, so then we should not skip it.
  1182. // Pretend we copied it.
  1183. state.skippedSparseBlock(block.Size)
  1184. state.copyDone(block)
  1185. continue
  1186. }
  1187. if f.copyBlock(ctx, block, state, otherFolderFilesystems) {
  1188. state.copyDone(block)
  1189. continue
  1190. }
  1191. if state.failed() != nil {
  1192. break
  1193. }
  1194. state.pullStarted()
  1195. ps := pullBlockState{
  1196. sharedPullerState: state.sharedPullerState,
  1197. block: block,
  1198. }
  1199. pullChan <- ps
  1200. }
  1201. // If there are no blocks to pull/copy, we still need the temporary file in place.
  1202. if len(state.blocks) == 0 {
  1203. _, err := state.tempFile()
  1204. if err != nil {
  1205. state.fail(err)
  1206. }
  1207. }
  1208. out <- state.sharedPullerState
  1209. }
  1210. }
  1211. // Returns true when the block was successfully copied.
  1212. func (f *sendReceiveFolder) copyBlock(ctx context.Context, block protocol.BlockInfo, state copyBlocksState, otherFolderFilesystems map[string]fs.Filesystem) bool {
  1213. buf := protocol.BufferPool.Get(block.Size)
  1214. defer protocol.BufferPool.Put(buf)
  1215. // Hope that it's usually in the same folder, so start with that
  1216. // one. Also possibly more efficient copy (same filesystem).
  1217. if f.copyBlockFromFolder(ctx, f.ID, block, state, f.mtimefs, buf) {
  1218. return true
  1219. }
  1220. if state.failed() != nil {
  1221. return false
  1222. }
  1223. for folderID, ffs := range otherFolderFilesystems {
  1224. if f.copyBlockFromFolder(ctx, folderID, block, state, ffs, buf) {
  1225. return true
  1226. }
  1227. if state.failed() != nil {
  1228. return false
  1229. }
  1230. }
  1231. return false
  1232. }
  1233. // Returns true when the block was successfully copied.
  1234. // The passed buffer must be large enough to accommodate the block.
  1235. func (f *sendReceiveFolder) copyBlockFromFolder(ctx context.Context, folderID string, block protocol.BlockInfo, state copyBlocksState, ffs fs.Filesystem, buf []byte) bool {
  1236. for e, err := range itererr.Zip(f.model.sdb.AllLocalBlocksWithHash(folderID, block.Hash)) {
  1237. if err != nil {
  1238. // We just ignore this and continue pulling instead (though
  1239. // there's a good chance that will fail too, if the DB is
  1240. // unhealthy).
  1241. f.sl.DebugContext(ctx, "Failed to get block information from database", "blockHash", block.Hash, slogutil.FilePath(state.file.Name), slogutil.Error(err))
  1242. return false
  1243. }
  1244. if !f.copyBlockFromFile(ctx, e.FileName, e.Offset, state, ffs, block, buf) {
  1245. if state.failed() != nil {
  1246. return false
  1247. }
  1248. continue
  1249. }
  1250. if e.FileName == state.file.Name {
  1251. state.copiedFromOrigin(block.Size)
  1252. } else {
  1253. state.copiedFromElsewhere(block.Size)
  1254. }
  1255. return true
  1256. }
  1257. return false
  1258. }
  1259. // Returns true when the block was successfully copied.
  1260. // The passed buffer must be large enough to accommodate the block.
  1261. func (f *sendReceiveFolder) copyBlockFromFile(ctx context.Context, srcName string, srcOffset int64, state copyBlocksState, ffs fs.Filesystem, block protocol.BlockInfo, buf []byte) bool {
  1262. fd, err := ffs.Open(srcName)
  1263. if err != nil {
  1264. f.sl.DebugContext(ctx, "Failed to open source file for block copy", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
  1265. return false
  1266. }
  1267. defer fd.Close()
  1268. _, err = fd.ReadAt(buf, srcOffset)
  1269. if err != nil {
  1270. f.sl.DebugContext(ctx, "Failed to read block from file", slogutil.FilePath(srcName), "blockHash", block.Hash, slogutil.Error(err))
  1271. return false
  1272. }
  1273. // Hash is not SHA256 as it's an encrypted hash token. In that
  1274. // case we can't verify the block integrity so we'll take it on
  1275. // trust. (The other side can and will verify.)
  1276. if f.Type != config.FolderTypeReceiveEncrypted {
  1277. if err := f.verifyBuffer(buf, block); err != nil {
  1278. f.sl.DebugContext(ctx, "Failed to verify block buffer", slogutil.Error(err))
  1279. return false
  1280. }
  1281. }
  1282. dstFd, err := state.tempFile()
  1283. if err != nil {
  1284. // State is already marked as failed when an error is returned here.
  1285. return false
  1286. }
  1287. if f.CopyRangeMethod != config.CopyRangeMethodStandard {
  1288. err = f.withLimiter(ctx, func() error {
  1289. dstFd.mut.Lock()
  1290. defer dstFd.mut.Unlock()
  1291. return fs.CopyRange(f.CopyRangeMethod.ToFS(), fd, dstFd.fd, srcOffset, block.Offset, int64(block.Size))
  1292. })
  1293. } else {
  1294. err = f.limitedWriteAt(ctx, dstFd, buf, block.Offset)
  1295. }
  1296. if err != nil {
  1297. state.fail(fmt.Errorf("dst write: %w", err))
  1298. return false
  1299. }
  1300. return true
  1301. }
  1302. func (*sendReceiveFolder) verifyBuffer(buf []byte, block protocol.BlockInfo) error {
  1303. if len(buf) != block.Size {
  1304. return fmt.Errorf("length mismatch %d != %d", len(buf), block.Size)
  1305. }
  1306. hash := sha256.Sum256(buf)
  1307. if !bytes.Equal(hash[:], block.Hash) {
  1308. return fmt.Errorf("hash mismatch %x != %x", hash, block.Hash)
  1309. }
  1310. return nil
  1311. }
  1312. func (f *sendReceiveFolder) pullerRoutine(ctx context.Context, in <-chan pullBlockState, out chan<- *sharedPullerState) {
  1313. requestLimiter := semaphore.New(f.PullerMaxPendingKiB * 1024)
  1314. var wg sync.WaitGroup
  1315. for state := range in {
  1316. if state.failed() != nil {
  1317. out <- state.sharedPullerState
  1318. continue
  1319. }
  1320. f.setState(FolderSyncing) // Does nothing if already FolderSyncing
  1321. // The requestLimiter limits how many pending block requests we have
  1322. // ongoing at any given time, based on the size of the blocks
  1323. // themselves.
  1324. bytes := state.block.Size
  1325. if err := requestLimiter.TakeWithContext(ctx, bytes); err != nil {
  1326. state.fail(err)
  1327. out <- state.sharedPullerState
  1328. continue
  1329. }
  1330. wg.Add(1)
  1331. go func() {
  1332. defer wg.Done()
  1333. defer requestLimiter.Give(bytes)
  1334. f.pullBlock(ctx, state, out)
  1335. }()
  1336. }
  1337. wg.Wait()
  1338. }
  1339. func (f *sendReceiveFolder) pullBlock(ctx context.Context, state pullBlockState, out chan<- *sharedPullerState) {
  1340. // Get an fd to the temporary file. Technically we don't need it until
  1341. // after fetching the block, but if we run into an error here there is
  1342. // no point in issuing the request to the network.
  1343. fd, err := state.tempFile()
  1344. if err != nil {
  1345. out <- state.sharedPullerState
  1346. return
  1347. }
  1348. if !f.DisableSparseFiles && state.reused == 0 && state.block.IsEmpty() {
  1349. // There is no need to request a block of all zeroes. Pretend we
  1350. // requested it and handled it correctly.
  1351. state.pullDone(state.block)
  1352. out <- state.sharedPullerState
  1353. return
  1354. }
  1355. var lastError error
  1356. candidates := f.model.blockAvailability(f.FolderConfiguration, state.file, state.block)
  1357. loop:
  1358. for {
  1359. select {
  1360. case <-ctx.Done():
  1361. state.fail(fmt.Errorf("folder stopped: %w", ctx.Err()))
  1362. break loop
  1363. default:
  1364. }
  1365. // Select the least busy device to pull the block from. If we found no
  1366. // feasible device at all, fail the block (and in the long run, the
  1367. // file).
  1368. found := activity.leastBusy(candidates)
  1369. if found == -1 {
  1370. if lastError != nil {
  1371. state.fail(fmt.Errorf("pull: %w", lastError))
  1372. } else {
  1373. state.fail(fmt.Errorf("pull: %w", errNoDevice))
  1374. }
  1375. break
  1376. }
  1377. selected := candidates[found]
  1378. candidates[found] = candidates[len(candidates)-1]
  1379. candidates = candidates[:len(candidates)-1]
  1380. // Fetch the block, while marking the selected device as in use so that
  1381. // leastBusy can select another device when someone else asks.
  1382. activity.using(selected)
  1383. var buf []byte
  1384. blockNo := int(state.block.Offset / int64(state.file.BlockSize()))
  1385. buf, lastError = f.model.RequestGlobal(ctx, selected.ID, f.folderID, state.file.Name, blockNo, state.block.Offset, state.block.Size, state.block.Hash, selected.FromTemporary)
  1386. activity.done(selected)
  1387. if lastError != nil {
  1388. f.sl.DebugContext(ctx, "Block request returned error", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size, "device", selected.ID.Short(), slogutil.Error(lastError))
  1389. continue
  1390. }
  1391. // Verify that the received block matches the desired hash, if not
  1392. // try pulling it from another device.
  1393. // For receive-only folders, the hash is not SHA256 as it's an
  1394. // encrypted hash token. In that case we can't verify the block
  1395. // integrity so we'll take it on trust. (The other side can and
  1396. // will verify.)
  1397. if f.Type != config.FolderTypeReceiveEncrypted {
  1398. lastError = f.verifyBuffer(buf, state.block)
  1399. }
  1400. if lastError != nil {
  1401. f.sl.DebugContext(ctx, "Block hash mismatch", slogutil.FilePath(state.file.Name), "offset", state.block.Offset, "size", state.block.Size)
  1402. continue
  1403. }
  1404. // Save the block data we got from the cluster
  1405. err = f.limitedWriteAt(ctx, fd, buf, state.block.Offset)
  1406. if err != nil {
  1407. state.fail(fmt.Errorf("save: %w", err))
  1408. } else {
  1409. state.pullDone(state.block)
  1410. }
  1411. break
  1412. }
  1413. out <- state.sharedPullerState
  1414. }
  1415. func (f *sendReceiveFolder) performFinish(file, curFile protocol.FileInfo, hasCurFile bool, tempName string, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) error {
  1416. // Set the correct permission bits on the new file
  1417. if !f.IgnorePerms && !file.NoPermissions {
  1418. if err := f.mtimefs.Chmod(tempName, fs.FileMode(file.Permissions&0o777)); err != nil {
  1419. return fmt.Errorf("setting permissions: %w", err)
  1420. }
  1421. }
  1422. // Set file xattrs and ownership.
  1423. if err := f.setPlatformData(&file, tempName); err != nil {
  1424. return fmt.Errorf("setting metadata: %w", err)
  1425. }
  1426. if stat, err := f.mtimefs.Lstat(file.Name); err == nil {
  1427. // There is an old file or directory already in place. We need to
  1428. // handle that.
  1429. if err := f.scanIfItemChanged(file.Name, stat, curFile, hasCurFile, false, scanChan); err != nil {
  1430. return fmt.Errorf("checking existing file: %w", err)
  1431. }
  1432. if !curFile.IsDirectory() && !curFile.IsSymlink() && file.InConflictWith(curFile) {
  1433. // The new file has been changed in conflict with the existing one. We
  1434. // should file it away as a conflict instead of just removing or
  1435. // archiving.
  1436. // Directories and symlinks aren't checked for conflicts.
  1437. err = f.inWritableDir(func(name string) error {
  1438. return f.moveForConflict(name, file.ModifiedBy.String(), scanChan)
  1439. }, curFile.Name)
  1440. } else {
  1441. err = f.deleteItemOnDisk(curFile, scanChan)
  1442. }
  1443. if err != nil {
  1444. return fmt.Errorf("moving for conflict: %w", err)
  1445. }
  1446. } else if !fs.IsNotExist(err) {
  1447. return fmt.Errorf("checking existing file: %w", err)
  1448. }
  1449. // Replace the original content with the new one. If it didn't work,
  1450. // leave the temp file in place for reuse.
  1451. if err := osutil.RenameOrCopy(f.CopyRangeMethod.ToFS(), f.mtimefs, f.mtimefs, tempName, file.Name); err != nil {
  1452. return fmt.Errorf("replacing file: %w", err)
  1453. }
  1454. // Set the correct timestamp on the new file
  1455. f.mtimefs.Chtimes(file.Name, file.ModTime(), file.ModTime()) // never fails
  1456. // Record the updated file in the index
  1457. dbUpdateChan <- dbUpdateJob{file, dbUpdateHandleFile}
  1458. return nil
  1459. }
  1460. func (f *sendReceiveFolder) finisherRoutine(ctx context.Context, in <-chan *sharedPullerState, dbUpdateChan chan<- dbUpdateJob, scanChan chan<- string) {
  1461. for state := range in {
  1462. if closed, err := state.finalClose(); closed {
  1463. f.sl.DebugContext(ctx, "Closing temp file", slogutil.FilePath(state.file.Name))
  1464. f.queue.Done(state.file.Name)
  1465. if err == nil {
  1466. err = f.performFinish(state.file, state.curFile, state.hasCurFile, state.tempName, dbUpdateChan, scanChan)
  1467. }
  1468. if err != nil {
  1469. f.newPullError(state.file.Name, fmt.Errorf("finishing: %w", err))
  1470. } else {
  1471. slog.InfoContext(ctx, "Synced file", f.LogAttr(), state.file.LogAttr(), slog.Group("blocks", slog.Int("local", state.reused+state.copyTotal), slog.Int("download", state.pullTotal)))
  1472. minBlocksPerBlock := state.file.BlockSize() / protocol.MinBlockSize
  1473. blockStatsMut.Lock()
  1474. blockStats["total"] += (state.reused + state.copyTotal + state.pullTotal) * minBlocksPerBlock
  1475. blockStats["reused"] += state.reused * minBlocksPerBlock
  1476. blockStats["pulled"] += state.pullTotal * minBlocksPerBlock
  1477. // copyOriginShifted is counted towards copyOrigin due to progress bar reasons
  1478. // for reporting reasons we want to separate these.
  1479. blockStats["copyOrigin"] += state.copyOrigin * minBlocksPerBlock
  1480. blockStats["copyElsewhere"] += (state.copyTotal - state.copyOrigin) * minBlocksPerBlock
  1481. blockStatsMut.Unlock()
  1482. }
  1483. if f.Type != config.FolderTypeReceiveEncrypted {
  1484. f.model.progressEmitter.Deregister(state)
  1485. }
  1486. f.evLogger.Log(events.ItemFinished, map[string]interface{}{
  1487. "folder": f.folderID,
  1488. "item": state.file.Name,
  1489. "error": events.Error(err),
  1490. "type": "file",
  1491. "action": "update",
  1492. })
  1493. }
  1494. }
  1495. }
  1496. // Moves the given filename to the front of the job queue
  1497. func (f *sendReceiveFolder) BringToFront(filename string) {
  1498. f.queue.BringToFront(filename)
  1499. }
  1500. func (f *sendReceiveFolder) Jobs(page, perpage int) ([]string, []string, int) {
  1501. return f.queue.Jobs(page, perpage)
  1502. }
  1503. // dbUpdaterRoutine aggregates db updates and commits them in batches no
  1504. // larger than 1000 items, and no more delayed than 2 seconds.
  1505. func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) int {
  1506. const maxBatchTime = 2 * time.Second
  1507. changed := 0
  1508. changedDirs := make(map[string]struct{})
  1509. found := false
  1510. var lastFile protocol.FileInfo
  1511. tick := time.NewTicker(maxBatchTime)
  1512. defer tick.Stop()
  1513. batch := NewFileInfoBatch(func(files []protocol.FileInfo) error {
  1514. // sync directories
  1515. for dir := range changedDirs {
  1516. delete(changedDirs, dir)
  1517. if !f.DisableFsync {
  1518. fd, err := f.mtimefs.Open(dir)
  1519. if err != nil {
  1520. f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
  1521. continue
  1522. }
  1523. if err := fd.Sync(); err != nil {
  1524. f.sl.Debug("Fsync failed", slogutil.FilePath(dir), slogutil.Error(err))
  1525. }
  1526. fd.Close()
  1527. }
  1528. }
  1529. // All updates to file/folder objects that originated remotely
  1530. // (across the network) use this call to updateLocals
  1531. f.updateLocalsFromPulling(files)
  1532. if found {
  1533. f.ReceivedFile(lastFile.Name, lastFile.IsDeleted())
  1534. found = false
  1535. }
  1536. return nil
  1537. })
  1538. loop:
  1539. for {
  1540. select {
  1541. case job, ok := <-dbUpdateChan:
  1542. if !ok {
  1543. break loop
  1544. }
  1545. switch job.jobType {
  1546. case dbUpdateHandleFile, dbUpdateShortcutFile:
  1547. changedDirs[filepath.Dir(job.file.Name)] = struct{}{}
  1548. case dbUpdateHandleDir:
  1549. changedDirs[job.file.Name] = struct{}{}
  1550. case dbUpdateHandleSymlink, dbUpdateInvalidate:
  1551. // fsyncing symlinks is only supported by MacOS
  1552. // and invalidated files are db only changes -> no sync
  1553. }
  1554. // For some reason we seem to care about file deletions and
  1555. // content modification, but not about metadata and dirs/symlinks.
  1556. if !job.file.IsInvalid() && job.jobType&(dbUpdateHandleFile|dbUpdateDeleteFile) != 0 {
  1557. found = true
  1558. lastFile = job.file
  1559. }
  1560. if !job.file.IsDeleted() && !job.file.IsInvalid() {
  1561. // Now that the file is finalized, grab possibly updated
  1562. // inode change time from disk into the local FileInfo. We
  1563. // use this change time to check for changes to xattrs etc
  1564. // on next scan.
  1565. if err := f.updateFileInfoChangeTime(&job.file); err != nil {
  1566. // This means on next scan the likely incorrect change time
  1567. // (resp. whatever caused the error) will cause this file to
  1568. // change. Log at info level to leave a trace if a user
  1569. // notices, but no need to warn
  1570. f.sl.Warn("Failed to update metadata at database commit", slogutil.FilePath(job.file.Name), slogutil.Error(err))
  1571. }
  1572. }
  1573. job.file.Sequence = 0
  1574. batch.Append(job.file)
  1575. batch.FlushIfFull()
  1576. changed++
  1577. case <-tick.C:
  1578. batch.Flush()
  1579. }
  1580. }
  1581. batch.Flush()
  1582. return changed
  1583. }
  1584. // pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
  1585. // scheduled once when scanChan is closed (scanning can not happen during pulling).
  1586. func (f *sendReceiveFolder) pullScannerRoutine(ctx context.Context, scanChan <-chan string) {
  1587. toBeScanned := make(map[string]struct{})
  1588. for path := range scanChan {
  1589. toBeScanned[path] = struct{}{}
  1590. }
  1591. if len(toBeScanned) != 0 {
  1592. scanList := make([]string, 0, len(toBeScanned))
  1593. for path := range toBeScanned {
  1594. slog.DebugContext(ctx, "Scheduling scan after pulling", slogutil.FilePath(path))
  1595. scanList = append(scanList, path)
  1596. }
  1597. f.Scan(scanList)
  1598. }
  1599. }
  1600. func (f *sendReceiveFolder) moveForConflict(name, lastModBy string, scanChan chan<- string) error {
  1601. if isConflict(name) {
  1602. f.sl.Info("Conflict on existing conflict copy; not copying again", slogutil.FilePath(name))
  1603. if err := f.mtimefs.Remove(name); err != nil && !fs.IsNotExist(err) {
  1604. return fmt.Errorf("%s: %w", contextRemovingOldItem, err)
  1605. }
  1606. return nil
  1607. }
  1608. if f.MaxConflicts == 0 {
  1609. if err := f.mtimefs.Remove(name); err != nil && !fs.IsNotExist(err) {
  1610. return fmt.Errorf("%s: %w", contextRemovingOldItem, err)
  1611. }
  1612. return nil
  1613. }
  1614. metricFolderConflictsTotal.WithLabelValues(f.ID).Inc()
  1615. newName := conflictName(name, lastModBy)
  1616. err := f.mtimefs.Rename(name, newName)
  1617. if fs.IsNotExist(err) {
  1618. // We were supposed to move a file away but it does not exist. Either
  1619. // the user has already moved it away, or the conflict was between a
  1620. // remote modification and a local delete. In either way it does not
  1621. // matter, go ahead as if the move succeeded.
  1622. err = nil
  1623. }
  1624. if f.MaxConflicts > -1 {
  1625. matches := existingConflicts(name, f.mtimefs)
  1626. if len(matches) > f.MaxConflicts {
  1627. slices.SortFunc(matches, func(a, b string) int {
  1628. return strings.Compare(b, a)
  1629. })
  1630. for _, match := range matches[f.MaxConflicts:] {
  1631. if gerr := f.mtimefs.Remove(match); gerr != nil {
  1632. f.sl.Debug("Failed to remove extra conflict copy", slogutil.Error(gerr))
  1633. }
  1634. }
  1635. }
  1636. }
  1637. if err == nil {
  1638. scanChan <- newName
  1639. }
  1640. return err
  1641. }
  1642. func (f *sendReceiveFolder) newPullError(path string, err error) {
  1643. if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
  1644. // Error because the folder stopped - no point logging/tracking
  1645. return
  1646. }
  1647. f.errorsMut.Lock()
  1648. defer f.errorsMut.Unlock()
  1649. // We might get more than one error report for a file (i.e. error on
  1650. // Write() followed by Close()); we keep the first error as that is
  1651. // probably closer to the root cause.
  1652. if _, ok := f.tempPullErrors[path]; ok {
  1653. return
  1654. }
  1655. // Establish context to differentiate from errors while scanning.
  1656. // Use "syncing" as opposed to "pulling" as the latter might be used
  1657. // for errors occurring specifically in the puller routine.
  1658. errStr := fmt.Sprintf("syncing: %s", err)
  1659. f.tempPullErrors[path] = errStr
  1660. f.sl.Debug("New pull error", slogutil.FilePath(path), slogutil.Error(err))
  1661. }
  1662. // deleteItemOnDisk deletes the file represented by old that is about to be replaced by new.
  1663. func (f *sendReceiveFolder) deleteItemOnDisk(item protocol.FileInfo, scanChan chan<- string) (err error) {
  1664. defer func() {
  1665. if err != nil {
  1666. err = fmt.Errorf("%s: %w", contextRemovingOldItem, err)
  1667. }
  1668. }()
  1669. switch {
  1670. case item.IsDirectory():
  1671. // Directories aren't archived and need special treatment due
  1672. // to potential children.
  1673. return f.deleteDirOnDisk(item.Name, scanChan)
  1674. case !item.IsSymlink() && f.versioner != nil:
  1675. // If we should use versioning, let the versioner archive the
  1676. // file before we replace it. Archiving a non-existent file is not
  1677. // an error.
  1678. // Symlinks aren't archived.
  1679. return f.inWritableDir(f.versioner.Archive, item.Name)
  1680. }
  1681. return f.inWritableDir(f.mtimefs.Remove, item.Name)
  1682. }
  1683. // deleteDirOnDisk attempts to delete a directory. It checks for files/dirs inside
  1684. // the directory and removes them if possible or returns an error if it fails
  1685. func (f *sendReceiveFolder) deleteDirOnDisk(dir string, scanChan chan<- string) error {
  1686. if err := osutil.TraversesSymlink(f.mtimefs, filepath.Dir(dir)); err != nil {
  1687. return err
  1688. }
  1689. if err := f.deleteDirOnDiskHandleChildren(dir, scanChan); err != nil {
  1690. return err
  1691. }
  1692. err := f.inWritableDir(f.mtimefs.Remove, dir)
  1693. if err == nil || fs.IsNotExist(err) {
  1694. // It was removed or it doesn't exist to start with
  1695. return nil
  1696. }
  1697. if _, serr := f.mtimefs.Lstat(dir); serr != nil && !fs.IsPermission(serr) {
  1698. // We get an error just looking at the directory, and it's not a
  1699. // permission problem. Lets assume the error is in fact some variant
  1700. // of "file does not exist" (possibly expressed as some parent being a
  1701. // file and not a directory etc) and that the delete is handled.
  1702. return nil
  1703. }
  1704. return err
  1705. }
  1706. func (f *sendReceiveFolder) deleteDirOnDiskHandleChildren(dir string, scanChan chan<- string) error {
  1707. var dirsToDelete []string
  1708. var hasIgnored, hasKnown, hasToBeScanned, hasReceiveOnlyChanged bool
  1709. var delErr error
  1710. err := f.mtimefs.Walk(dir, func(path string, info fs.FileInfo, err error) error {
  1711. if path == dir {
  1712. return nil
  1713. }
  1714. if err != nil {
  1715. return err
  1716. }
  1717. switch match := f.ignores.Match(path); {
  1718. case match.IsDeletable():
  1719. if info.IsDir() {
  1720. dirsToDelete = append(dirsToDelete, path)
  1721. return nil
  1722. }
  1723. fallthrough
  1724. case fs.IsTemporary(path):
  1725. if err := f.mtimefs.Remove(path); err != nil && delErr == nil {
  1726. delErr = err
  1727. }
  1728. return nil
  1729. case match.IsIgnored():
  1730. hasIgnored = true
  1731. return nil
  1732. }
  1733. cf, ok, err := f.model.sdb.GetDeviceFile(f.folderID, protocol.LocalDeviceID, path)
  1734. if err != nil {
  1735. return err
  1736. }
  1737. switch {
  1738. case !ok || cf.IsDeleted():
  1739. // Something appeared in the dir that we either are not
  1740. // aware of at all or that we think should be deleted
  1741. // -> schedule scan.
  1742. scanChan <- path
  1743. hasToBeScanned = true
  1744. return nil
  1745. case ok && f.Type == config.FolderTypeReceiveOnly && cf.IsReceiveOnlyChanged():
  1746. hasReceiveOnlyChanged = true
  1747. return nil
  1748. }
  1749. diskFile, err := scanner.CreateFileInfo(info, path, f.mtimefs, f.SyncOwnership, f.SyncXattrs, f.XattrFilter)
  1750. if err != nil {
  1751. // Lets just assume the file has changed.
  1752. scanChan <- path
  1753. hasToBeScanned = true
  1754. return nil //nolint:nilerr
  1755. }
  1756. if !cf.IsEquivalentOptional(diskFile, protocol.FileInfoComparison{
  1757. ModTimeWindow: f.modTimeWindow,
  1758. IgnorePerms: f.IgnorePerms,
  1759. IgnoreBlocks: true,
  1760. IgnoreFlags: protocol.LocalAllFlags,
  1761. IgnoreOwnership: !f.SyncOwnership,
  1762. IgnoreXattrs: !f.SyncXattrs,
  1763. }) {
  1764. // File on disk changed compared to what we have in db
  1765. // -> schedule scan.
  1766. scanChan <- path
  1767. hasToBeScanned = true
  1768. return nil
  1769. }
  1770. // Dir contains file that is valid according to db and
  1771. // not ignored -> something weird is going on
  1772. hasKnown = true
  1773. return nil
  1774. })
  1775. if err != nil {
  1776. return err
  1777. }
  1778. for i := range dirsToDelete {
  1779. if err := f.mtimefs.Remove(dirsToDelete[len(dirsToDelete)-1-i]); err != nil && delErr == nil {
  1780. delErr = err
  1781. }
  1782. }
  1783. // "Error precedence":
  1784. // Something changed on disk, check that and maybe all else gets resolved
  1785. if hasToBeScanned {
  1786. return errDirHasToBeScanned
  1787. }
  1788. // Ignored files will never be touched, i.e. this will keep failing until
  1789. // user acts.
  1790. if hasIgnored {
  1791. return errDirHasIgnored
  1792. }
  1793. if hasReceiveOnlyChanged {
  1794. // Pretend we deleted the directory. It will be resurrected as a
  1795. // receive-only changed item on scan.
  1796. scanChan <- dir
  1797. return nil
  1798. }
  1799. if hasKnown {
  1800. return errDirNotEmpty
  1801. }
  1802. // All good, except maybe failing to remove a (?d) ignored item
  1803. return delErr
  1804. }
  1805. // scanIfItemChanged schedules the given file for scanning and returns errModified
  1806. // if it differs from the information in the database. Returns nil if the file has
  1807. // not changed.
  1808. func (f *sendReceiveFolder) scanIfItemChanged(name string, stat fs.FileInfo, item protocol.FileInfo, hasItem bool, fromDelete bool, scanChan chan<- string) (err error) {
  1809. defer func() {
  1810. if errors.Is(err, errModified) {
  1811. scanChan <- name
  1812. }
  1813. }()
  1814. if !hasItem || item.Deleted {
  1815. // The item appeared from nowhere
  1816. return errModified
  1817. }
  1818. // Check that the item on disk is what we expect it to be according
  1819. // to the database. If there's a mismatch here, there might be local
  1820. // changes that we don't know about yet and we should scan before
  1821. // touching the item.
  1822. statItem, err := scanner.CreateFileInfo(stat, item.Name, f.mtimefs, f.SyncOwnership, f.SyncXattrs, f.XattrFilter)
  1823. if err != nil {
  1824. return fmt.Errorf("comparing item on disk to db: %w", err)
  1825. }
  1826. if !statItem.IsEquivalentOptional(item, protocol.FileInfoComparison{
  1827. ModTimeWindow: f.modTimeWindow,
  1828. IgnorePerms: f.IgnorePerms,
  1829. IgnoreBlocks: true,
  1830. IgnoreFlags: protocol.LocalAllFlags,
  1831. IgnoreOwnership: fromDelete || !f.SyncOwnership,
  1832. IgnoreXattrs: fromDelete || !f.SyncXattrs,
  1833. }) {
  1834. return errModified
  1835. }
  1836. return nil
  1837. }
  1838. // checkToBeDeleted makes sure the file on disk is compatible with what there is
  1839. // in the DB before the caller proceeds with actually deleting it.
  1840. // I.e. non-nil error status means "Do not delete!" or "is already deleted".
  1841. func (f *sendReceiveFolder) checkToBeDeleted(file, cur protocol.FileInfo, hasCur bool, scanChan chan<- string) error {
  1842. if err := osutil.TraversesSymlink(f.mtimefs, filepath.Dir(file.Name)); err != nil {
  1843. f.sl.Debug("Not deleting item behind symlink on disk, but updating database", slogutil.FilePath(file.Name))
  1844. return fs.ErrNotExist
  1845. }
  1846. stat, err := f.mtimefs.Lstat(file.Name)
  1847. deleted := fs.IsNotExist(err) || fs.IsErrCaseConflict(err)
  1848. if !deleted && err != nil {
  1849. return err
  1850. }
  1851. if deleted {
  1852. if hasCur && !cur.Deleted && !cur.IsUnsupported() {
  1853. scanChan <- file.Name
  1854. return errModified
  1855. }
  1856. f.sl.Debug("Not deleting item we don't have, but updating database", slogutil.FilePath(file.Name))
  1857. return err
  1858. }
  1859. return f.scanIfItemChanged(file.Name, stat, cur, hasCur, true, scanChan)
  1860. }
  1861. // setPlatformData makes adjustments to the metadata that should happen for
  1862. // all types (files, directories, symlinks). This should be one of the last
  1863. // things we do to a file when syncing changes to it.
  1864. func (f *sendReceiveFolder) setPlatformData(file *protocol.FileInfo, name string) error {
  1865. if f.SyncXattrs {
  1866. // Set extended attributes.
  1867. if err := f.mtimefs.SetXattr(name, file.Platform.Xattrs(), f.XattrFilter); errors.Is(err, fs.ErrXattrsNotSupported) {
  1868. f.sl.Debug("Cannot set xattrs (not supported)", slogutil.FilePath(file.Name), slogutil.Error(err))
  1869. } else if err != nil {
  1870. return err
  1871. }
  1872. }
  1873. if f.SyncOwnership {
  1874. // Set ownership based on file metadata.
  1875. if err := f.syncOwnership(file, name); err != nil {
  1876. return err
  1877. }
  1878. } else if f.CopyOwnershipFromParent {
  1879. // Copy the parent owner and group.
  1880. if err := f.copyOwnershipFromParent(name); err != nil {
  1881. return err
  1882. }
  1883. }
  1884. return nil
  1885. }
  1886. func (f *sendReceiveFolder) copyOwnershipFromParent(path string) error {
  1887. if build.IsWindows {
  1888. // Can't do anything.
  1889. return nil
  1890. }
  1891. info, err := f.mtimefs.Lstat(filepath.Dir(path))
  1892. if err != nil {
  1893. return fmt.Errorf("copy owner from parent: %w", err)
  1894. }
  1895. if err := f.mtimefs.Lchown(path, strconv.Itoa(info.Owner()), strconv.Itoa(info.Group())); err != nil {
  1896. return fmt.Errorf("copy owner from parent: %w", err)
  1897. }
  1898. return nil
  1899. }
  1900. func (f *sendReceiveFolder) inWritableDir(fn func(string) error, path string) error {
  1901. return inWritableDir(fn, f.mtimefs, path, f.IgnorePerms)
  1902. }
  1903. func (f *sendReceiveFolder) limitedWriteAt(ctx context.Context, fd io.WriterAt, data []byte, offset int64) error {
  1904. return f.withLimiter(ctx, func() error {
  1905. _, err := fd.WriteAt(data, offset)
  1906. return err
  1907. })
  1908. }
  1909. func (f *sendReceiveFolder) withLimiter(ctx context.Context, fn func() error) error {
  1910. if err := f.writeLimiter.TakeWithContext(ctx, 1); err != nil {
  1911. return err
  1912. }
  1913. defer f.writeLimiter.Give(1)
  1914. return fn()
  1915. }
  1916. // updateFileInfoChangeTime updates the inode change time in the FileInfo,
  1917. // because that depends on the current, new, state of the file on disk.
  1918. func (f *sendReceiveFolder) updateFileInfoChangeTime(file *protocol.FileInfo) error {
  1919. info, err := f.mtimefs.Lstat(file.Name)
  1920. if err != nil {
  1921. return err
  1922. }
  1923. if ct := info.InodeChangeTime(); !ct.IsZero() {
  1924. file.InodeChangeNs = ct.UnixNano()
  1925. } else {
  1926. file.InodeChangeNs = 0
  1927. }
  1928. return nil
  1929. }
  1930. // A []FileError is sent as part of an event and will be JSON serialized.
  1931. type FileError struct {
  1932. Path string `json:"path"`
  1933. Err string `json:"error"`
  1934. }
  1935. func conflictName(name, lastModBy string) string {
  1936. ext := filepath.Ext(name)
  1937. return name[:len(name)-len(ext)] + time.Now().Format(".sync-conflict-20060102-150405-") + lastModBy + ext
  1938. }
  1939. func isConflict(name string) bool {
  1940. return strings.Contains(filepath.Base(name), ".sync-conflict-")
  1941. }
  1942. func existingConflicts(name string, fs fs.Filesystem) []string {
  1943. ext := filepath.Ext(name)
  1944. matches, err := fs.Glob(name[:len(name)-len(ext)] + ".sync-conflict-????????-??????*" + ext)
  1945. if err != nil {
  1946. slog.Debug("Globbing for conflicts failed", slogutil.Error(err))
  1947. }
  1948. return matches
  1949. }