syncthing.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package syncthing
  7. import (
  8. "context"
  9. "crypto/tls"
  10. "fmt"
  11. "io"
  12. "net/http"
  13. "os"
  14. "runtime"
  15. "sort"
  16. "strings"
  17. "sync"
  18. "time"
  19. "github.com/thejerf/suture"
  20. "github.com/syncthing/syncthing/lib/api"
  21. "github.com/syncthing/syncthing/lib/build"
  22. "github.com/syncthing/syncthing/lib/config"
  23. "github.com/syncthing/syncthing/lib/connections"
  24. "github.com/syncthing/syncthing/lib/db"
  25. "github.com/syncthing/syncthing/lib/db/backend"
  26. "github.com/syncthing/syncthing/lib/discover"
  27. "github.com/syncthing/syncthing/lib/events"
  28. "github.com/syncthing/syncthing/lib/locations"
  29. "github.com/syncthing/syncthing/lib/logger"
  30. "github.com/syncthing/syncthing/lib/model"
  31. "github.com/syncthing/syncthing/lib/osutil"
  32. "github.com/syncthing/syncthing/lib/protocol"
  33. "github.com/syncthing/syncthing/lib/rand"
  34. "github.com/syncthing/syncthing/lib/sha256"
  35. "github.com/syncthing/syncthing/lib/tlsutil"
  36. "github.com/syncthing/syncthing/lib/upgrade"
  37. "github.com/syncthing/syncthing/lib/ur"
  38. )
  39. const (
  40. bepProtocolName = "bep/1.0"
  41. tlsDefaultCommonName = "syncthing"
  42. maxSystemErrors = 5
  43. initialSystemLog = 10
  44. maxSystemLog = 250
  45. deviceCertLifetimeDays = 20 * 365
  46. )
  47. type ExitStatus int
  48. func (s ExitStatus) AsInt() int {
  49. return int(s)
  50. }
  51. const (
  52. ExitSuccess ExitStatus = 0
  53. ExitError ExitStatus = 1
  54. ExitNoUpgradeAvailable ExitStatus = 2
  55. ExitRestart ExitStatus = 3
  56. ExitUpgrade ExitStatus = 4
  57. )
  58. type Options struct {
  59. AssetDir string
  60. AuditWriter io.Writer
  61. DeadlockTimeoutS int
  62. NoUpgrade bool
  63. ProfilerURL string
  64. ResetDeltaIdxs bool
  65. Verbose bool
  66. // null duration means use default value
  67. DBRecheckInterval time.Duration
  68. DBIndirectGCInterval time.Duration
  69. }
  70. type App struct {
  71. myID protocol.DeviceID
  72. mainService *suture.Supervisor
  73. cfg config.Wrapper
  74. ll *db.Lowlevel
  75. evLogger events.Logger
  76. cert tls.Certificate
  77. opts Options
  78. exitStatus ExitStatus
  79. err error
  80. stopOnce sync.Once
  81. stop chan struct{}
  82. stopped chan struct{}
  83. }
  84. func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger, cert tls.Certificate, opts Options) *App {
  85. a := &App{
  86. cfg: cfg,
  87. ll: db.NewLowlevel(dbBackend, db.WithRecheckInterval(opts.DBRecheckInterval), db.WithIndirectGCInterval(opts.DBIndirectGCInterval)),
  88. evLogger: evLogger,
  89. opts: opts,
  90. cert: cert,
  91. stop: make(chan struct{}),
  92. stopped: make(chan struct{}),
  93. }
  94. close(a.stopped) // Hasn't been started, so shouldn't block on Wait.
  95. return a
  96. }
  97. // Start executes the app and returns once all the startup operations are done,
  98. // e.g. the API is ready for use.
  99. // Must be called once only.
  100. func (a *App) Start() error {
  101. // Create a main service manager. We'll add things to this as we go along.
  102. // We want any logging it does to go through our log system.
  103. a.mainService = suture.New("main", suture.Spec{
  104. Log: func(line string) {
  105. l.Debugln(line)
  106. },
  107. PassThroughPanics: true,
  108. })
  109. // Start the supervisor and wait for it to stop to handle cleanup.
  110. a.stopped = make(chan struct{})
  111. a.mainService.ServeBackground()
  112. go a.run()
  113. if err := a.startup(); err != nil {
  114. a.stopWithErr(ExitError, err)
  115. return err
  116. }
  117. return nil
  118. }
  119. func (a *App) startup() error {
  120. a.mainService.Add(a.ll)
  121. if a.opts.AuditWriter != nil {
  122. a.mainService.Add(newAuditService(a.opts.AuditWriter, a.evLogger))
  123. }
  124. if a.opts.Verbose {
  125. a.mainService.Add(newVerboseService(a.evLogger))
  126. }
  127. errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0)
  128. systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog)
  129. // Event subscription for the API; must start early to catch the early
  130. // events. The LocalChangeDetected event might overwhelm the event
  131. // receiver in some situations so we will not subscribe to it here.
  132. defaultSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DefaultEventMask), api.EventSubBufferSize)
  133. diskSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DiskEventMask), api.EventSubBufferSize)
  134. // Attempt to increase the limit on number of open files to the maximum
  135. // allowed, in case we have many peers. We don't really care enough to
  136. // report the error if there is one.
  137. osutil.MaximizeOpenFileLimit()
  138. // Figure out our device ID, set it as the log prefix and log it.
  139. a.myID = protocol.NewDeviceID(a.cert.Certificate[0])
  140. l.SetPrefix(fmt.Sprintf("[%s] ", a.myID.String()[:5]))
  141. l.Infoln("My ID:", a.myID)
  142. // Select SHA256 implementation and report. Affected by the
  143. // STHASHING environment variable.
  144. sha256.SelectAlgo()
  145. sha256.Report()
  146. // Emit the Starting event, now that we know who we are.
  147. a.evLogger.Log(events.Starting, map[string]string{
  148. "home": locations.GetBaseDir(locations.ConfigBaseDir),
  149. "myID": a.myID.String(),
  150. })
  151. if err := checkShortIDs(a.cfg); err != nil {
  152. l.Warnln("Short device IDs are in conflict. Unlucky!\n Regenerate the device ID of one of the following:\n ", err)
  153. return err
  154. }
  155. if len(a.opts.ProfilerURL) > 0 {
  156. go func() {
  157. l.Debugln("Starting profiler on", a.opts.ProfilerURL)
  158. runtime.SetBlockProfileRate(1)
  159. err := http.ListenAndServe(a.opts.ProfilerURL, nil)
  160. if err != nil {
  161. l.Warnln(err)
  162. return
  163. }
  164. }()
  165. }
  166. perf := ur.CpuBench(context.Background(), 3, 150*time.Millisecond, true)
  167. l.Infof("Hashing performance is %.02f MB/s", perf)
  168. if err := db.UpdateSchema(a.ll); err != nil {
  169. l.Warnln("Database schema:", err)
  170. return err
  171. }
  172. if a.opts.ResetDeltaIdxs {
  173. l.Infoln("Reinitializing delta index IDs")
  174. db.DropDeltaIndexIDs(a.ll)
  175. }
  176. protectedFiles := []string{
  177. locations.Get(locations.Database),
  178. locations.Get(locations.ConfigFile),
  179. locations.Get(locations.CertFile),
  180. locations.Get(locations.KeyFile),
  181. }
  182. // Remove database entries for folders that no longer exist in the config
  183. folders := a.cfg.Folders()
  184. for _, folder := range a.ll.ListFolders() {
  185. if _, ok := folders[folder]; !ok {
  186. l.Infof("Cleaning data for dropped folder %q", folder)
  187. db.DropFolder(a.ll, folder)
  188. }
  189. }
  190. // Grab the previously running version string from the database.
  191. miscDB := db.NewMiscDataNamespace(a.ll)
  192. prevVersion, _, err := miscDB.String("prevVersion")
  193. if err != nil {
  194. l.Warnln("Database:", err)
  195. return err
  196. }
  197. // Strip away prerelease/beta stuff and just compare the release
  198. // numbers. 0.14.44 to 0.14.45-banana is an upgrade, 0.14.45-banana to
  199. // 0.14.45-pineapple is not.
  200. prevParts := strings.Split(prevVersion, "-")
  201. curParts := strings.Split(build.Version, "-")
  202. if rel := upgrade.CompareVersions(prevParts[0], curParts[0]); rel != upgrade.Equal {
  203. if prevVersion != "" {
  204. l.Infoln("Detected upgrade from", prevVersion, "to", build.Version)
  205. }
  206. // Drop delta indexes in case we've changed random stuff we
  207. // shouldn't have. We will resend our index on next connect.
  208. db.DropDeltaIndexIDs(a.ll)
  209. }
  210. // Check and repair metadata and sequences on every upgrade including RCs.
  211. prevParts = strings.Split(prevVersion, "+")
  212. curParts = strings.Split(build.Version, "+")
  213. if rel := upgrade.CompareVersions(prevParts[0], curParts[0]); rel != upgrade.Equal {
  214. l.Infoln("Checking db due to upgrade - this may take a while...")
  215. a.ll.CheckRepair()
  216. }
  217. if build.Version != prevVersion {
  218. // Remember the new version.
  219. miscDB.PutString("prevVersion", build.Version)
  220. }
  221. m := model.NewModel(a.cfg, a.myID, "syncthing", build.Version, a.ll, protectedFiles, a.evLogger)
  222. if a.opts.DeadlockTimeoutS > 0 {
  223. m.StartDeadlockDetector(time.Duration(a.opts.DeadlockTimeoutS) * time.Second)
  224. } else if !build.IsRelease || build.IsBeta {
  225. m.StartDeadlockDetector(20 * time.Minute)
  226. }
  227. a.mainService.Add(m)
  228. // The TLS configuration is used for both the listening socket and outgoing
  229. // connections.
  230. tlsCfg := tlsutil.SecureDefault()
  231. tlsCfg.Certificates = []tls.Certificate{a.cert}
  232. tlsCfg.NextProtos = []string{bepProtocolName}
  233. tlsCfg.ClientAuth = tls.RequestClientCert
  234. tlsCfg.SessionTicketsDisabled = true
  235. tlsCfg.InsecureSkipVerify = true
  236. // Start discovery and connection management
  237. // Chicken and egg, discovery manager depends on connection service to tell it what addresses it's listening on
  238. // Connection service depends on discovery manager to get addresses to connect to.
  239. // Create a wrapper that is then wired after they are both setup.
  240. addrLister := &lateAddressLister{}
  241. discoveryManager := discover.NewManager(a.myID, a.cfg, a.cert, a.evLogger, addrLister)
  242. connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, discoveryManager, bepProtocolName, tlsDefaultCommonName, a.evLogger)
  243. addrLister.AddressLister = connectionsService
  244. a.mainService.Add(discoveryManager)
  245. a.mainService.Add(connectionsService)
  246. // Candidate builds always run with usage reporting.
  247. if opts := a.cfg.Options(); build.IsCandidate {
  248. l.Infoln("Anonymous usage reporting is always enabled for candidate releases.")
  249. if opts.URAccepted != ur.Version {
  250. opts.URAccepted = ur.Version
  251. a.cfg.SetOptions(opts)
  252. a.cfg.Save()
  253. // Unique ID will be set and config saved below if necessary.
  254. }
  255. }
  256. // If we are going to do usage reporting, ensure we have a valid unique ID.
  257. if opts := a.cfg.Options(); opts.URAccepted > 0 && opts.URUniqueID == "" {
  258. opts.URUniqueID = rand.String(8)
  259. a.cfg.SetOptions(opts)
  260. a.cfg.Save()
  261. }
  262. usageReportingSvc := ur.New(a.cfg, m, connectionsService, a.opts.NoUpgrade)
  263. a.mainService.Add(usageReportingSvc)
  264. // GUI
  265. if err := a.setupGUI(m, defaultSub, diskSub, discoveryManager, connectionsService, usageReportingSvc, errors, systemLog); err != nil {
  266. l.Warnln("Failed starting API:", err)
  267. return err
  268. }
  269. myDev, _ := a.cfg.Device(a.myID)
  270. l.Infof(`My name is "%v"`, myDev.Name)
  271. for _, device := range a.cfg.Devices() {
  272. if device.DeviceID != a.myID {
  273. l.Infof(`Device %s is "%v" at %v`, device.DeviceID, device.Name, device.Addresses)
  274. }
  275. }
  276. if isSuperUser() {
  277. l.Warnln("Syncthing should not run as a privileged or system user. Please consider using a normal user account.")
  278. }
  279. a.evLogger.Log(events.StartupComplete, map[string]string{
  280. "myID": a.myID.String(),
  281. })
  282. if a.cfg.Options().SetLowPriority {
  283. if err := osutil.SetLowPriority(); err != nil {
  284. l.Warnln("Failed to lower process priority:", err)
  285. }
  286. }
  287. return nil
  288. }
  289. func (a *App) run() {
  290. <-a.stop
  291. if shouldDebug() {
  292. l.Debugln("Services before stop:")
  293. printServiceTree(os.Stdout, a.mainService, 0)
  294. }
  295. a.mainService.Stop()
  296. done := make(chan struct{})
  297. go func() {
  298. a.ll.Close()
  299. close(done)
  300. }()
  301. select {
  302. case <-done:
  303. case <-time.After(10 * time.Second):
  304. l.Warnln("Database failed to stop within 10s")
  305. }
  306. l.Infoln("Exiting")
  307. close(a.stopped)
  308. }
  309. // Wait blocks until the app stops running. Also returns if the app hasn't been
  310. // started yet.
  311. func (a *App) Wait() ExitStatus {
  312. <-a.stopped
  313. return a.exitStatus
  314. }
  315. // Error returns an error if one occurred while running the app. It does not wait
  316. // for the app to stop before returning.
  317. func (a *App) Error() error {
  318. select {
  319. case <-a.stop:
  320. return a.err
  321. default:
  322. }
  323. return nil
  324. }
  325. // Stop stops the app and sets its exit status to given reason, unless the app
  326. // was already stopped before. In any case it returns the effective exit status.
  327. func (a *App) Stop(stopReason ExitStatus) ExitStatus {
  328. return a.stopWithErr(stopReason, nil)
  329. }
  330. func (a *App) stopWithErr(stopReason ExitStatus, err error) ExitStatus {
  331. a.stopOnce.Do(func() {
  332. a.exitStatus = stopReason
  333. a.err = err
  334. close(a.stop)
  335. })
  336. <-a.stopped
  337. return a.exitStatus
  338. }
  339. func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.Manager, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error {
  340. guiCfg := a.cfg.GUI()
  341. if !guiCfg.Enabled {
  342. return nil
  343. }
  344. if guiCfg.InsecureAdminAccess {
  345. l.Warnln("Insecure admin access is enabled.")
  346. }
  347. summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID, a.evLogger)
  348. a.mainService.Add(summaryService)
  349. apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, &controller{a}, a.opts.NoUpgrade)
  350. a.mainService.Add(apiSvc)
  351. if err := apiSvc.WaitForStart(); err != nil {
  352. return err
  353. }
  354. return nil
  355. }
  356. // checkShortIDs verifies that the configuration won't result in duplicate
  357. // short ID:s; that is, that the devices in the cluster all have unique
  358. // initial 64 bits.
  359. func checkShortIDs(cfg config.Wrapper) error {
  360. exists := make(map[protocol.ShortID]protocol.DeviceID)
  361. for deviceID := range cfg.Devices() {
  362. shortID := deviceID.Short()
  363. if otherID, ok := exists[shortID]; ok {
  364. return fmt.Errorf("%v in conflict with %v", deviceID, otherID)
  365. }
  366. exists[shortID] = deviceID
  367. }
  368. return nil
  369. }
  370. // Implements api.Controller
  371. type controller struct{ *App }
  372. func (e *controller) Restart() {
  373. e.Stop(ExitRestart)
  374. }
  375. func (e *controller) Shutdown() {
  376. e.Stop(ExitSuccess)
  377. }
  378. func (e *controller) ExitUpgrading() {
  379. e.Stop(ExitUpgrade)
  380. }
  381. type supervisor interface{ Services() []suture.Service }
  382. func printServiceTree(w io.Writer, sup supervisor, level int) {
  383. printService(w, sup, level)
  384. svcs := sup.Services()
  385. sort.Slice(svcs, func(a, b int) bool {
  386. return fmt.Sprint(svcs[a]) < fmt.Sprint(svcs[b])
  387. })
  388. for _, svc := range svcs {
  389. if sub, ok := svc.(supervisor); ok {
  390. printServiceTree(w, sub, level+1)
  391. } else {
  392. printService(w, svc, level+1)
  393. }
  394. }
  395. }
  396. func printService(w io.Writer, svc interface{}, level int) {
  397. type errorer interface{ Error() error }
  398. t := "-"
  399. if _, ok := svc.(supervisor); ok {
  400. t = "+"
  401. }
  402. fmt.Fprintln(w, strings.Repeat(" ", level), t, svc)
  403. if es, ok := svc.(errorer); ok {
  404. if err := es.Error(); err != nil {
  405. fmt.Fprintln(w, strings.Repeat(" ", level), " ->", err)
  406. }
  407. }
  408. }
  409. type lateAddressLister struct {
  410. discover.AddressLister
  411. }