syncthing.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package syncthing
  7. import (
  8. "context"
  9. "crypto/tls"
  10. "errors"
  11. "fmt"
  12. "io"
  13. "net/http"
  14. "os"
  15. "runtime"
  16. "sort"
  17. "strings"
  18. "sync"
  19. "time"
  20. "github.com/thejerf/suture/v4"
  21. "github.com/syncthing/syncthing/lib/api"
  22. "github.com/syncthing/syncthing/lib/build"
  23. "github.com/syncthing/syncthing/lib/config"
  24. "github.com/syncthing/syncthing/lib/connections"
  25. "github.com/syncthing/syncthing/lib/connections/registry"
  26. "github.com/syncthing/syncthing/lib/db"
  27. "github.com/syncthing/syncthing/lib/db/backend"
  28. "github.com/syncthing/syncthing/lib/discover"
  29. "github.com/syncthing/syncthing/lib/events"
  30. "github.com/syncthing/syncthing/lib/locations"
  31. "github.com/syncthing/syncthing/lib/logger"
  32. "github.com/syncthing/syncthing/lib/model"
  33. "github.com/syncthing/syncthing/lib/osutil"
  34. "github.com/syncthing/syncthing/lib/protocol"
  35. "github.com/syncthing/syncthing/lib/rand"
  36. "github.com/syncthing/syncthing/lib/sha256"
  37. "github.com/syncthing/syncthing/lib/svcutil"
  38. "github.com/syncthing/syncthing/lib/tlsutil"
  39. "github.com/syncthing/syncthing/lib/upgrade"
  40. "github.com/syncthing/syncthing/lib/ur"
  41. )
  42. const (
  43. bepProtocolName = "bep/1.0"
  44. tlsDefaultCommonName = "syncthing"
  45. maxSystemErrors = 5
  46. initialSystemLog = 10
  47. maxSystemLog = 250
  48. deviceCertLifetimeDays = 20 * 365
  49. )
  50. type Options struct {
  51. AuditWriter io.Writer
  52. NoUpgrade bool
  53. ProfilerAddr string
  54. ResetDeltaIdxs bool
  55. Verbose bool
  56. // null duration means use default value
  57. DBRecheckInterval time.Duration
  58. DBIndirectGCInterval time.Duration
  59. }
  60. type App struct {
  61. myID protocol.DeviceID
  62. mainService *suture.Supervisor
  63. cfg config.Wrapper
  64. ll *db.Lowlevel
  65. evLogger events.Logger
  66. cert tls.Certificate
  67. opts Options
  68. exitStatus svcutil.ExitStatus
  69. err error
  70. stopOnce sync.Once
  71. mainServiceCancel context.CancelFunc
  72. stopped chan struct{}
  73. Model model.Model
  74. }
  75. func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger, cert tls.Certificate, opts Options) (*App, error) {
  76. ll, err := db.NewLowlevel(dbBackend, evLogger, db.WithRecheckInterval(opts.DBRecheckInterval), db.WithIndirectGCInterval(opts.DBIndirectGCInterval))
  77. if err != nil {
  78. return nil, err
  79. }
  80. a := &App{
  81. cfg: cfg,
  82. ll: ll,
  83. evLogger: evLogger,
  84. opts: opts,
  85. cert: cert,
  86. stopped: make(chan struct{}),
  87. }
  88. close(a.stopped) // Hasn't been started, so shouldn't block on Wait.
  89. return a, nil
  90. }
  91. // Start executes the app and returns once all the startup operations are done,
  92. // e.g. the API is ready for use.
  93. // Must be called once only.
  94. func (a *App) Start() error {
  95. // Create a main service manager. We'll add things to this as we go along.
  96. // We want any logging it does to go through our log system.
  97. spec := svcutil.SpecWithDebugLogger(l)
  98. a.mainService = suture.New("main", spec)
  99. // Start the supervisor and wait for it to stop to handle cleanup.
  100. a.stopped = make(chan struct{})
  101. ctx, cancel := context.WithCancel(context.Background())
  102. a.mainServiceCancel = cancel
  103. errChan := a.mainService.ServeBackground(ctx)
  104. go a.wait(errChan)
  105. if err := a.startup(); err != nil {
  106. a.stopWithErr(svcutil.ExitError, err)
  107. return err
  108. }
  109. return nil
  110. }
  111. func (a *App) startup() error {
  112. a.mainService.Add(ur.NewFailureHandler(a.cfg, a.evLogger))
  113. a.mainService.Add(a.ll)
  114. if a.opts.AuditWriter != nil {
  115. a.mainService.Add(newAuditService(a.opts.AuditWriter, a.evLogger))
  116. }
  117. if a.opts.Verbose {
  118. a.mainService.Add(newVerboseService(a.evLogger))
  119. }
  120. errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0)
  121. systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog)
  122. // Event subscription for the API; must start early to catch the early
  123. // events. The LocalChangeDetected event might overwhelm the event
  124. // receiver in some situations so we will not subscribe to it here.
  125. defaultSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DefaultEventMask), api.EventSubBufferSize)
  126. diskSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DiskEventMask), api.EventSubBufferSize)
  127. // Attempt to increase the limit on number of open files to the maximum
  128. // allowed, in case we have many peers. We don't really care enough to
  129. // report the error if there is one.
  130. osutil.MaximizeOpenFileLimit()
  131. // Figure out our device ID, set it as the log prefix and log it.
  132. a.myID = protocol.NewDeviceID(a.cert.Certificate[0])
  133. l.SetPrefix(fmt.Sprintf("[%s] ", a.myID.String()[:5]))
  134. l.Infoln("My ID:", a.myID)
  135. // Select SHA256 implementation and report. Affected by the
  136. // STHASHING environment variable.
  137. sha256.SelectAlgo()
  138. sha256.Report()
  139. // Emit the Starting event, now that we know who we are.
  140. a.evLogger.Log(events.Starting, map[string]string{
  141. "home": locations.GetBaseDir(locations.ConfigBaseDir),
  142. "myID": a.myID.String(),
  143. })
  144. if err := checkShortIDs(a.cfg); err != nil {
  145. l.Warnln("Short device IDs are in conflict. Unlucky!\n Regenerate the device ID of one of the following:\n ", err)
  146. return err
  147. }
  148. if len(a.opts.ProfilerAddr) > 0 {
  149. go func() {
  150. l.Debugln("Starting profiler on", a.opts.ProfilerAddr)
  151. runtime.SetBlockProfileRate(1)
  152. err := http.ListenAndServe(a.opts.ProfilerAddr, nil)
  153. if err != nil {
  154. l.Warnln(err)
  155. return
  156. }
  157. }()
  158. }
  159. perf := ur.CpuBench(context.Background(), 3, 150*time.Millisecond, true)
  160. l.Infof("Hashing performance is %.02f MB/s", perf)
  161. if err := db.UpdateSchema(a.ll); err != nil {
  162. l.Warnln("Database schema:", err)
  163. return err
  164. }
  165. if a.opts.ResetDeltaIdxs {
  166. l.Infoln("Reinitializing delta index IDs")
  167. db.DropDeltaIndexIDs(a.ll)
  168. }
  169. protectedFiles := []string{
  170. locations.Get(locations.Database),
  171. locations.Get(locations.ConfigFile),
  172. locations.Get(locations.CertFile),
  173. locations.Get(locations.KeyFile),
  174. }
  175. // Remove database entries for folders that no longer exist in the config
  176. folders := a.cfg.Folders()
  177. for _, folder := range a.ll.ListFolders() {
  178. if _, ok := folders[folder]; !ok {
  179. l.Infof("Cleaning metadata for dropped folder %q", folder)
  180. db.DropFolder(a.ll, folder)
  181. }
  182. }
  183. // Grab the previously running version string from the database.
  184. miscDB := db.NewMiscDataNamespace(a.ll)
  185. prevVersion, _, err := miscDB.String("prevVersion")
  186. if err != nil {
  187. l.Warnln("Database:", err)
  188. return err
  189. }
  190. // Strip away prerelease/beta stuff and just compare the release
  191. // numbers. 0.14.44 to 0.14.45-banana is an upgrade, 0.14.45-banana to
  192. // 0.14.45-pineapple is not.
  193. prevParts := strings.Split(prevVersion, "-")
  194. curParts := strings.Split(build.Version, "-")
  195. if rel := upgrade.CompareVersions(prevParts[0], curParts[0]); rel != upgrade.Equal {
  196. if prevVersion != "" {
  197. l.Infoln("Detected upgrade from", prevVersion, "to", build.Version)
  198. }
  199. if a.cfg.Options().SendFullIndexOnUpgrade {
  200. // Drop delta indexes in case we've changed random stuff we
  201. // shouldn't have. We will resend our index on next connect.
  202. db.DropDeltaIndexIDs(a.ll)
  203. }
  204. }
  205. if build.Version != prevVersion {
  206. // Remember the new version.
  207. miscDB.PutString("prevVersion", build.Version)
  208. }
  209. if err := globalMigration(a.ll, a.cfg); err != nil {
  210. l.Warnln("Global migration:", err)
  211. return err
  212. }
  213. keyGen := protocol.NewKeyGenerator()
  214. m := model.NewModel(a.cfg, a.myID, a.ll, protectedFiles, a.evLogger, keyGen)
  215. a.Model = m
  216. a.mainService.Add(m)
  217. // The TLS configuration is used for both the listening socket and outgoing
  218. // connections.
  219. var tlsCfg *tls.Config
  220. if a.cfg.Options().InsecureAllowOldTLSVersions {
  221. l.Infoln("TLS 1.2 is allowed on sync connections. This is less than optimally secure.")
  222. tlsCfg = tlsutil.SecureDefaultWithTLS12()
  223. } else {
  224. tlsCfg = tlsutil.SecureDefaultTLS13()
  225. }
  226. tlsCfg.Certificates = []tls.Certificate{a.cert}
  227. tlsCfg.NextProtos = []string{bepProtocolName}
  228. tlsCfg.ClientAuth = tls.RequestClientCert
  229. tlsCfg.SessionTicketsDisabled = true
  230. tlsCfg.InsecureSkipVerify = true
  231. // Start discovery and connection management
  232. // Chicken and egg, discovery manager depends on connection service to tell it what addresses it's listening on
  233. // Connection service depends on discovery manager to get addresses to connect to.
  234. // Create a wrapper that is then wired after they are both setup.
  235. addrLister := &lateAddressLister{}
  236. connRegistry := registry.New()
  237. discoveryManager := discover.NewManager(a.myID, a.cfg, a.cert, a.evLogger, addrLister, connRegistry)
  238. connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, discoveryManager, bepProtocolName, tlsDefaultCommonName, a.evLogger, connRegistry, keyGen)
  239. addrLister.AddressLister = connectionsService
  240. a.mainService.Add(discoveryManager)
  241. a.mainService.Add(connectionsService)
  242. a.cfg.Modify(func(cfg *config.Configuration) {
  243. // Candidate builds always run with usage reporting.
  244. if build.IsCandidate {
  245. l.Infoln("Anonymous usage reporting is always enabled for candidate releases.")
  246. if cfg.Options.URAccepted != ur.Version {
  247. cfg.Options.URAccepted = ur.Version
  248. // Unique ID will be set and config saved below if necessary.
  249. }
  250. }
  251. // If we are going to do usage reporting, ensure we have a valid unique ID.
  252. if cfg.Options.URAccepted > 0 && cfg.Options.URUniqueID == "" {
  253. cfg.Options.URUniqueID = rand.String(8)
  254. }
  255. })
  256. usageReportingSvc := ur.New(a.cfg, m, connectionsService, a.opts.NoUpgrade)
  257. a.mainService.Add(usageReportingSvc)
  258. // GUI
  259. if err := a.setupGUI(m, defaultSub, diskSub, discoveryManager, connectionsService, usageReportingSvc, errors, systemLog, miscDB); err != nil {
  260. l.Warnln("Failed starting API:", err)
  261. return err
  262. }
  263. myDev, _ := a.cfg.Device(a.myID)
  264. l.Infof(`My name is "%v"`, myDev.Name)
  265. for _, device := range a.cfg.Devices() {
  266. if device.DeviceID != a.myID {
  267. l.Infof(`Device %s is "%v" at %v`, device.DeviceID, device.Name, device.Addresses)
  268. }
  269. }
  270. if isSuperUser() {
  271. l.Warnln("Syncthing should not run as a privileged or system user. Please consider using a normal user account.")
  272. }
  273. a.evLogger.Log(events.StartupComplete, map[string]string{
  274. "myID": a.myID.String(),
  275. })
  276. if a.cfg.Options().SetLowPriority {
  277. if err := osutil.SetLowPriority(); err != nil {
  278. l.Warnln("Failed to lower process priority:", err)
  279. }
  280. }
  281. return nil
  282. }
  283. func (a *App) wait(errChan <-chan error) {
  284. err := <-errChan
  285. a.handleMainServiceError(err)
  286. done := make(chan struct{})
  287. go func() {
  288. a.ll.Close()
  289. close(done)
  290. }()
  291. select {
  292. case <-done:
  293. case <-time.After(10 * time.Second):
  294. l.Warnln("Database failed to stop within 10s")
  295. }
  296. l.Infoln("Exiting")
  297. close(a.stopped)
  298. }
  299. func (a *App) handleMainServiceError(err error) {
  300. if err == nil || errors.Is(err, context.Canceled) {
  301. return
  302. }
  303. var fatalErr *svcutil.FatalErr
  304. if errors.As(err, &fatalErr) {
  305. a.exitStatus = fatalErr.Status
  306. a.err = fatalErr.Err
  307. return
  308. }
  309. a.err = err
  310. a.exitStatus = svcutil.ExitError
  311. }
  312. // Wait blocks until the app stops running. Also returns if the app hasn't been
  313. // started yet.
  314. func (a *App) Wait() svcutil.ExitStatus {
  315. <-a.stopped
  316. return a.exitStatus
  317. }
  318. // Error returns an error if one occurred while running the app. It does not wait
  319. // for the app to stop before returning.
  320. func (a *App) Error() error {
  321. select {
  322. case <-a.stopped:
  323. return a.err
  324. default:
  325. }
  326. return nil
  327. }
  328. // Stop stops the app and sets its exit status to given reason, unless the app
  329. // was already stopped before. In any case it returns the effective exit status.
  330. func (a *App) Stop(stopReason svcutil.ExitStatus) svcutil.ExitStatus {
  331. return a.stopWithErr(stopReason, nil)
  332. }
  333. func (a *App) stopWithErr(stopReason svcutil.ExitStatus, err error) svcutil.ExitStatus {
  334. a.stopOnce.Do(func() {
  335. a.exitStatus = stopReason
  336. a.err = err
  337. if shouldDebug() {
  338. l.Debugln("Services before stop:")
  339. printServiceTree(os.Stdout, a.mainService, 0)
  340. }
  341. a.mainServiceCancel()
  342. })
  343. <-a.stopped
  344. return a.exitStatus
  345. }
  346. func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder, miscDB *db.NamespacedKV) error {
  347. guiCfg := a.cfg.GUI()
  348. if !guiCfg.Enabled {
  349. return nil
  350. }
  351. if guiCfg.InsecureAdminAccess {
  352. l.Warnln("Insecure admin access is enabled.")
  353. }
  354. summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID, a.evLogger)
  355. a.mainService.Add(summaryService)
  356. apiSvc := api.New(a.myID, a.cfg, locations.Get(locations.GUIAssets), tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, a.opts.NoUpgrade, miscDB)
  357. a.mainService.Add(apiSvc)
  358. if err := apiSvc.WaitForStart(); err != nil {
  359. return err
  360. }
  361. return nil
  362. }
  363. // checkShortIDs verifies that the configuration won't result in duplicate
  364. // short ID:s; that is, that the devices in the cluster all have unique
  365. // initial 64 bits.
  366. func checkShortIDs(cfg config.Wrapper) error {
  367. exists := make(map[protocol.ShortID]protocol.DeviceID)
  368. for deviceID := range cfg.Devices() {
  369. shortID := deviceID.Short()
  370. if otherID, ok := exists[shortID]; ok {
  371. return fmt.Errorf("%v in conflict with %v", deviceID, otherID)
  372. }
  373. exists[shortID] = deviceID
  374. }
  375. return nil
  376. }
  377. type supervisor interface{ Services() []suture.Service }
  378. func printServiceTree(w io.Writer, sup supervisor, level int) {
  379. printService(w, sup, level)
  380. svcs := sup.Services()
  381. sort.Slice(svcs, func(a, b int) bool {
  382. return fmt.Sprint(svcs[a]) < fmt.Sprint(svcs[b])
  383. })
  384. for _, svc := range svcs {
  385. if sub, ok := svc.(supervisor); ok {
  386. printServiceTree(w, sub, level+1)
  387. } else {
  388. printService(w, svc, level+1)
  389. }
  390. }
  391. }
  392. func printService(w io.Writer, svc interface{}, level int) {
  393. type errorer interface{ Error() error }
  394. t := "-"
  395. if _, ok := svc.(supervisor); ok {
  396. t = "+"
  397. }
  398. fmt.Fprintln(w, strings.Repeat(" ", level), t, svc)
  399. if es, ok := svc.(errorer); ok {
  400. if err := es.Error(); err != nil {
  401. fmt.Fprintln(w, strings.Repeat(" ", level), " ->", err)
  402. }
  403. }
  404. }
  405. type lateAddressLister struct {
  406. discover.AddressLister
  407. }