syncthing.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464
  1. // Copyright (C) 2014 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package syncthing
  7. import (
  8. "crypto/tls"
  9. "fmt"
  10. "io"
  11. "net/http"
  12. "runtime"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/thejerf/suture"
  17. "github.com/syncthing/syncthing/lib/api"
  18. "github.com/syncthing/syncthing/lib/build"
  19. "github.com/syncthing/syncthing/lib/config"
  20. "github.com/syncthing/syncthing/lib/connections"
  21. "github.com/syncthing/syncthing/lib/db"
  22. "github.com/syncthing/syncthing/lib/db/backend"
  23. "github.com/syncthing/syncthing/lib/discover"
  24. "github.com/syncthing/syncthing/lib/events"
  25. "github.com/syncthing/syncthing/lib/locations"
  26. "github.com/syncthing/syncthing/lib/logger"
  27. "github.com/syncthing/syncthing/lib/model"
  28. "github.com/syncthing/syncthing/lib/osutil"
  29. "github.com/syncthing/syncthing/lib/protocol"
  30. "github.com/syncthing/syncthing/lib/rand"
  31. "github.com/syncthing/syncthing/lib/sha256"
  32. "github.com/syncthing/syncthing/lib/tlsutil"
  33. "github.com/syncthing/syncthing/lib/ur"
  34. )
  35. const (
  36. bepProtocolName = "bep/1.0"
  37. tlsDefaultCommonName = "syncthing"
  38. maxSystemErrors = 5
  39. initialSystemLog = 10
  40. maxSystemLog = 250
  41. deviceCertLifetimeDays = 20 * 365
  42. )
  43. type ExitStatus int
  44. func (s ExitStatus) AsInt() int {
  45. return int(s)
  46. }
  47. const (
  48. ExitSuccess ExitStatus = 0
  49. ExitError ExitStatus = 1
  50. ExitNoUpgradeAvailable ExitStatus = 2
  51. ExitRestart ExitStatus = 3
  52. ExitUpgrade ExitStatus = 4
  53. )
  54. type Options struct {
  55. AssetDir string
  56. AuditWriter io.Writer
  57. DeadlockTimeoutS int
  58. NoUpgrade bool
  59. ProfilerURL string
  60. ResetDeltaIdxs bool
  61. Verbose bool
  62. }
  63. type App struct {
  64. myID protocol.DeviceID
  65. mainService *suture.Supervisor
  66. cfg config.Wrapper
  67. ll *db.Lowlevel
  68. evLogger events.Logger
  69. cert tls.Certificate
  70. opts Options
  71. exitStatus ExitStatus
  72. err error
  73. stopOnce sync.Once
  74. stop chan struct{}
  75. stopped chan struct{}
  76. }
  77. func New(cfg config.Wrapper, dbBackend backend.Backend, evLogger events.Logger, cert tls.Certificate, opts Options) *App {
  78. a := &App{
  79. cfg: cfg,
  80. ll: db.NewLowlevel(dbBackend),
  81. evLogger: evLogger,
  82. opts: opts,
  83. cert: cert,
  84. stop: make(chan struct{}),
  85. stopped: make(chan struct{}),
  86. }
  87. close(a.stopped) // Hasn't been started, so shouldn't block on Wait.
  88. return a
  89. }
  90. // Start executes the app and returns once all the startup operations are done,
  91. // e.g. the API is ready for use.
  92. // Must be called once only.
  93. func (a *App) Start() error {
  94. if err := a.startup(); err != nil {
  95. a.stopWithErr(ExitError, err)
  96. return err
  97. }
  98. a.stopped = make(chan struct{})
  99. go a.run()
  100. return nil
  101. }
  102. func (a *App) startup() error {
  103. // Create a main service manager. We'll add things to this as we go along.
  104. // We want any logging it does to go through our log system.
  105. a.mainService = suture.New("main", suture.Spec{
  106. Log: func(line string) {
  107. l.Debugln(line)
  108. },
  109. PassThroughPanics: true,
  110. })
  111. a.mainService.ServeBackground()
  112. if a.opts.AuditWriter != nil {
  113. a.mainService.Add(newAuditService(a.opts.AuditWriter, a.evLogger))
  114. }
  115. if a.opts.Verbose {
  116. a.mainService.Add(newVerboseService(a.evLogger))
  117. }
  118. errors := logger.NewRecorder(l, logger.LevelWarn, maxSystemErrors, 0)
  119. systemLog := logger.NewRecorder(l, logger.LevelDebug, maxSystemLog, initialSystemLog)
  120. // Event subscription for the API; must start early to catch the early
  121. // events. The LocalChangeDetected event might overwhelm the event
  122. // receiver in some situations so we will not subscribe to it here.
  123. defaultSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DefaultEventMask), api.EventSubBufferSize)
  124. diskSub := events.NewBufferedSubscription(a.evLogger.Subscribe(api.DiskEventMask), api.EventSubBufferSize)
  125. // Attempt to increase the limit on number of open files to the maximum
  126. // allowed, in case we have many peers. We don't really care enough to
  127. // report the error if there is one.
  128. osutil.MaximizeOpenFileLimit()
  129. // Figure out our device ID, set it as the log prefix and log it.
  130. a.myID = protocol.NewDeviceID(a.cert.Certificate[0])
  131. l.SetPrefix(fmt.Sprintf("[%s] ", a.myID.String()[:5]))
  132. l.Infoln("My ID:", a.myID)
  133. // Select SHA256 implementation and report. Affected by the
  134. // STHASHING environment variable.
  135. sha256.SelectAlgo()
  136. sha256.Report()
  137. // Emit the Starting event, now that we know who we are.
  138. a.evLogger.Log(events.Starting, map[string]string{
  139. "home": locations.GetBaseDir(locations.ConfigBaseDir),
  140. "myID": a.myID.String(),
  141. })
  142. if err := checkShortIDs(a.cfg); err != nil {
  143. l.Warnln("Short device IDs are in conflict. Unlucky!\n Regenerate the device ID of one of the following:\n ", err)
  144. return err
  145. }
  146. if len(a.opts.ProfilerURL) > 0 {
  147. go func() {
  148. l.Debugln("Starting profiler on", a.opts.ProfilerURL)
  149. runtime.SetBlockProfileRate(1)
  150. err := http.ListenAndServe(a.opts.ProfilerURL, nil)
  151. if err != nil {
  152. l.Warnln(err)
  153. return
  154. }
  155. }()
  156. }
  157. perf := ur.CpuBench(3, 150*time.Millisecond, true)
  158. l.Infof("Hashing performance is %.02f MB/s", perf)
  159. if err := db.UpdateSchema(a.ll); err != nil {
  160. l.Warnln("Database schema:", err)
  161. return err
  162. }
  163. if a.opts.ResetDeltaIdxs {
  164. l.Infoln("Reinitializing delta index IDs")
  165. db.DropDeltaIndexIDs(a.ll)
  166. }
  167. protectedFiles := []string{
  168. locations.Get(locations.Database),
  169. locations.Get(locations.ConfigFile),
  170. locations.Get(locations.CertFile),
  171. locations.Get(locations.KeyFile),
  172. }
  173. // Remove database entries for folders that no longer exist in the config
  174. folders := a.cfg.Folders()
  175. for _, folder := range a.ll.ListFolders() {
  176. if _, ok := folders[folder]; !ok {
  177. l.Infof("Cleaning data for dropped folder %q", folder)
  178. db.DropFolder(a.ll, folder)
  179. }
  180. }
  181. // Grab the previously running version string from the database.
  182. miscDB := db.NewMiscDataNamespace(a.ll)
  183. prevVersion, _, err := miscDB.String("prevVersion")
  184. if err != nil {
  185. l.Warnln("Database:", err)
  186. return err
  187. }
  188. // Strip away prerelease/beta stuff and just compare the release
  189. // numbers. 0.14.44 to 0.14.45-banana is an upgrade, 0.14.45-banana to
  190. // 0.14.45-pineapple is not.
  191. prevParts := strings.Split(prevVersion, "-")
  192. curParts := strings.Split(build.Version, "-")
  193. if prevParts[0] != curParts[0] {
  194. if prevVersion != "" {
  195. l.Infoln("Detected upgrade from", prevVersion, "to", build.Version)
  196. }
  197. // Drop delta indexes in case we've changed random stuff we
  198. // shouldn't have. We will resend our index on next connect.
  199. db.DropDeltaIndexIDs(a.ll)
  200. // Remember the new version.
  201. miscDB.PutString("prevVersion", build.Version)
  202. }
  203. m := model.NewModel(a.cfg, a.myID, "syncthing", build.Version, a.ll, protectedFiles, a.evLogger)
  204. if a.opts.DeadlockTimeoutS > 0 {
  205. m.StartDeadlockDetector(time.Duration(a.opts.DeadlockTimeoutS) * time.Second)
  206. } else if !build.IsRelease || build.IsBeta {
  207. m.StartDeadlockDetector(20 * time.Minute)
  208. }
  209. a.mainService.Add(m)
  210. // Start discovery
  211. cachedDiscovery := discover.NewCachingMux()
  212. a.mainService.Add(cachedDiscovery)
  213. // The TLS configuration is used for both the listening socket and outgoing
  214. // connections.
  215. tlsCfg := tlsutil.SecureDefault()
  216. tlsCfg.Certificates = []tls.Certificate{a.cert}
  217. tlsCfg.NextProtos = []string{bepProtocolName}
  218. tlsCfg.ClientAuth = tls.RequestClientCert
  219. tlsCfg.SessionTicketsDisabled = true
  220. tlsCfg.InsecureSkipVerify = true
  221. // Start connection management
  222. connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, cachedDiscovery, bepProtocolName, tlsDefaultCommonName, a.evLogger)
  223. a.mainService.Add(connectionsService)
  224. if a.cfg.Options().GlobalAnnEnabled {
  225. for _, srv := range a.cfg.Options().GlobalDiscoveryServers() {
  226. l.Infoln("Using discovery server", srv)
  227. gd, err := discover.NewGlobal(srv, a.cert, connectionsService, a.evLogger)
  228. if err != nil {
  229. l.Warnln("Global discovery:", err)
  230. continue
  231. }
  232. // Each global discovery server gets its results cached for five
  233. // minutes, and is not asked again for a minute when it's returned
  234. // unsuccessfully.
  235. cachedDiscovery.Add(gd, 5*time.Minute, time.Minute)
  236. }
  237. }
  238. if a.cfg.Options().LocalAnnEnabled {
  239. // v4 broadcasts
  240. bcd, err := discover.NewLocal(a.myID, fmt.Sprintf(":%d", a.cfg.Options().LocalAnnPort), connectionsService, a.evLogger)
  241. if err != nil {
  242. l.Warnln("IPv4 local discovery:", err)
  243. } else {
  244. cachedDiscovery.Add(bcd, 0, 0)
  245. }
  246. // v6 multicasts
  247. mcd, err := discover.NewLocal(a.myID, a.cfg.Options().LocalAnnMCAddr, connectionsService, a.evLogger)
  248. if err != nil {
  249. l.Warnln("IPv6 local discovery:", err)
  250. } else {
  251. cachedDiscovery.Add(mcd, 0, 0)
  252. }
  253. }
  254. // Candidate builds always run with usage reporting.
  255. if opts := a.cfg.Options(); build.IsCandidate {
  256. l.Infoln("Anonymous usage reporting is always enabled for candidate releases.")
  257. if opts.URAccepted != ur.Version {
  258. opts.URAccepted = ur.Version
  259. a.cfg.SetOptions(opts)
  260. a.cfg.Save()
  261. // Unique ID will be set and config saved below if necessary.
  262. }
  263. }
  264. // If we are going to do usage reporting, ensure we have a valid unique ID.
  265. if opts := a.cfg.Options(); opts.URAccepted > 0 && opts.URUniqueID == "" {
  266. opts.URUniqueID = rand.String(8)
  267. a.cfg.SetOptions(opts)
  268. a.cfg.Save()
  269. }
  270. usageReportingSvc := ur.New(a.cfg, m, connectionsService, a.opts.NoUpgrade)
  271. a.mainService.Add(usageReportingSvc)
  272. // GUI
  273. if err := a.setupGUI(m, defaultSub, diskSub, cachedDiscovery, connectionsService, usageReportingSvc, errors, systemLog); err != nil {
  274. l.Warnln("Failed starting API:", err)
  275. return err
  276. }
  277. myDev, _ := a.cfg.Device(a.myID)
  278. l.Infof(`My name is "%v"`, myDev.Name)
  279. for _, device := range a.cfg.Devices() {
  280. if device.DeviceID != a.myID {
  281. l.Infof(`Device %s is "%v" at %v`, device.DeviceID, device.Name, device.Addresses)
  282. }
  283. }
  284. if isSuperUser() {
  285. l.Warnln("Syncthing should not run as a privileged or system user. Please consider using a normal user account.")
  286. }
  287. a.evLogger.Log(events.StartupComplete, map[string]string{
  288. "myID": a.myID.String(),
  289. })
  290. if a.cfg.Options().SetLowPriority {
  291. if err := osutil.SetLowPriority(); err != nil {
  292. l.Warnln("Failed to lower process priority:", err)
  293. }
  294. }
  295. return nil
  296. }
  297. func (a *App) run() {
  298. <-a.stop
  299. a.mainService.Stop()
  300. done := make(chan struct{})
  301. go func() {
  302. a.ll.Close()
  303. close(done)
  304. }()
  305. select {
  306. case <-done:
  307. case <-time.After(10 * time.Second):
  308. l.Warnln("Database failed to stop within 10s")
  309. }
  310. l.Infoln("Exiting")
  311. close(a.stopped)
  312. }
  313. // Wait blocks until the app stops running. Also returns if the app hasn't been
  314. // started yet.
  315. func (a *App) Wait() ExitStatus {
  316. <-a.stopped
  317. return a.exitStatus
  318. }
  319. // Error returns an error if one occurred while running the app. It does not wait
  320. // for the app to stop before returning.
  321. func (a *App) Error() error {
  322. select {
  323. case <-a.stop:
  324. return a.err
  325. default:
  326. }
  327. return nil
  328. }
  329. // Stop stops the app and sets its exit status to given reason, unless the app
  330. // was already stopped before. In any case it returns the effective exit status.
  331. func (a *App) Stop(stopReason ExitStatus) ExitStatus {
  332. return a.stopWithErr(stopReason, nil)
  333. }
  334. func (a *App) stopWithErr(stopReason ExitStatus, err error) ExitStatus {
  335. a.stopOnce.Do(func() {
  336. a.exitStatus = stopReason
  337. a.err = err
  338. close(a.stop)
  339. })
  340. return a.exitStatus
  341. }
  342. func (a *App) setupGUI(m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, errors, systemLog logger.Recorder) error {
  343. guiCfg := a.cfg.GUI()
  344. if !guiCfg.Enabled {
  345. return nil
  346. }
  347. if guiCfg.InsecureAdminAccess {
  348. l.Warnln("Insecure admin access is enabled.")
  349. }
  350. cpu := newCPUService()
  351. a.mainService.Add(cpu)
  352. summaryService := model.NewFolderSummaryService(a.cfg, m, a.myID, a.evLogger)
  353. a.mainService.Add(summaryService)
  354. apiSvc := api.New(a.myID, a.cfg, a.opts.AssetDir, tlsDefaultCommonName, m, defaultSub, diskSub, a.evLogger, discoverer, connectionsService, urService, summaryService, errors, systemLog, cpu, &controller{a}, a.opts.NoUpgrade)
  355. a.mainService.Add(apiSvc)
  356. if err := apiSvc.WaitForStart(); err != nil {
  357. return err
  358. }
  359. return nil
  360. }
  361. // checkShortIDs verifies that the configuration won't result in duplicate
  362. // short ID:s; that is, that the devices in the cluster all have unique
  363. // initial 64 bits.
  364. func checkShortIDs(cfg config.Wrapper) error {
  365. exists := make(map[protocol.ShortID]protocol.DeviceID)
  366. for deviceID := range cfg.Devices() {
  367. shortID := deviceID.Short()
  368. if otherID, ok := exists[shortID]; ok {
  369. return fmt.Errorf("%v in conflict with %v", deviceID, otherID)
  370. }
  371. exists[shortID] = deviceID
  372. }
  373. return nil
  374. }
  375. // Implements api.Controller
  376. type controller struct{ *App }
  377. func (e *controller) Restart() {
  378. e.Stop(ExitRestart)
  379. }
  380. func (e *controller) Shutdown() {
  381. e.Stop(ExitSuccess)
  382. }
  383. func (e *controller) ExitUpgrading() {
  384. e.Stop(ExitUpgrade)
  385. }