main.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package main
  7. import (
  8. "context"
  9. "crypto/tls"
  10. "flag"
  11. "log"
  12. "net"
  13. "net/http"
  14. "os"
  15. "runtime"
  16. "strings"
  17. "time"
  18. "github.com/prometheus/client_golang/prometheus/promhttp"
  19. _ "github.com/syncthing/syncthing/lib/automaxprocs"
  20. "github.com/syncthing/syncthing/lib/build"
  21. "github.com/syncthing/syncthing/lib/protocol"
  22. "github.com/syncthing/syncthing/lib/rand"
  23. "github.com/syncthing/syncthing/lib/tlsutil"
  24. "github.com/syndtr/goleveldb/leveldb/opt"
  25. "github.com/thejerf/suture/v4"
  26. )
  27. const (
  28. addressExpiryTime = 2 * time.Hour
  29. databaseStatisticsInterval = 5 * time.Minute
  30. // Reannounce-After is set to reannounceAfterSeconds +
  31. // random(reannounzeFuzzSeconds), similar for Retry-After
  32. reannounceAfterSeconds = 3300
  33. reannounzeFuzzSeconds = 300
  34. errorRetryAfterSeconds = 1500
  35. errorRetryFuzzSeconds = 300
  36. // Retry for not found is minSeconds + failures * incSeconds +
  37. // random(fuzz), where failures is the number of consecutive lookups
  38. // with no answer, up to maxSeconds. The fuzz is applied after capping
  39. // to maxSeconds.
  40. notFoundRetryMinSeconds = 60
  41. notFoundRetryMaxSeconds = 3540
  42. notFoundRetryIncSeconds = 10
  43. notFoundRetryFuzzSeconds = 60
  44. // How often (in requests) we serialize the missed counter to database.
  45. notFoundMissesWriteInterval = 10
  46. httpReadTimeout = 5 * time.Second
  47. httpWriteTimeout = 5 * time.Second
  48. httpMaxHeaderBytes = 1 << 10
  49. // Size of the replication outbox channel
  50. replicationOutboxSize = 10000
  51. )
  52. // These options make the database a little more optimized for writes, at
  53. // the expense of some memory usage and risk of losing writes in a (system)
  54. // crash.
  55. var levelDBOptions = &opt.Options{
  56. NoSync: true,
  57. WriteBuffer: 32 << 20, // default 4<<20
  58. }
  59. var debug = false
  60. func main() {
  61. var listen string
  62. var dir string
  63. var metricsListen string
  64. var replicationListen string
  65. var replicationPeers string
  66. var certFile string
  67. var keyFile string
  68. var replCertFile string
  69. var replKeyFile string
  70. var useHTTP bool
  71. var largeDB bool
  72. var amqpAddress string
  73. missesIncrease := 1
  74. log.SetOutput(os.Stdout)
  75. log.SetFlags(0)
  76. flag.StringVar(&certFile, "cert", "./cert.pem", "Certificate file")
  77. flag.StringVar(&keyFile, "key", "./key.pem", "Key file")
  78. flag.StringVar(&dir, "db-dir", "./discovery.db", "Database directory")
  79. flag.BoolVar(&debug, "debug", false, "Print debug output")
  80. flag.BoolVar(&useHTTP, "http", false, "Listen on HTTP (behind an HTTPS proxy)")
  81. flag.StringVar(&listen, "listen", ":8443", "Listen address")
  82. flag.StringVar(&metricsListen, "metrics-listen", "", "Metrics listen address")
  83. flag.StringVar(&replicationPeers, "replicate", "", "Replication peers, id@address, comma separated")
  84. flag.StringVar(&replicationListen, "replication-listen", ":19200", "Replication listen address")
  85. flag.StringVar(&replCertFile, "replication-cert", "", "Certificate file for replication")
  86. flag.StringVar(&replKeyFile, "replication-key", "", "Key file for replication")
  87. flag.BoolVar(&largeDB, "large-db", false, "Use larger database settings")
  88. flag.StringVar(&amqpAddress, "amqp-address", "", "Address to AMQP broker")
  89. flag.IntVar(&missesIncrease, "misses-increase", 1, "How many times to increase the misses counter on each miss")
  90. showVersion := flag.Bool("version", false, "Show version")
  91. flag.Parse()
  92. log.Println(build.LongVersionFor("stdiscosrv"))
  93. if *showVersion {
  94. return
  95. }
  96. buildInfo.WithLabelValues(build.Version, runtime.Version(), build.User, build.Date.UTC().Format("2006-01-02T15:04:05Z")).Set(1)
  97. if largeDB {
  98. levelDBOptions.BlockCacheCapacity = 64 << 20
  99. levelDBOptions.BlockSize = 64 << 10
  100. levelDBOptions.CompactionTableSize = 16 << 20
  101. levelDBOptions.CompactionTableSizeMultiplier = 2.0
  102. levelDBOptions.WriteBuffer = 64 << 20
  103. levelDBOptions.CompactionL0Trigger = 8
  104. }
  105. cert, err := tls.LoadX509KeyPair(certFile, keyFile)
  106. if os.IsNotExist(err) {
  107. log.Println("Failed to load keypair. Generating one, this might take a while...")
  108. cert, err = tlsutil.NewCertificate(certFile, keyFile, "stdiscosrv", 20*365)
  109. if err != nil {
  110. log.Fatalln("Failed to generate X509 key pair:", err)
  111. }
  112. } else if err != nil {
  113. log.Fatalln("Failed to load keypair:", err)
  114. }
  115. devID := protocol.NewDeviceID(cert.Certificate[0])
  116. log.Println("Server device ID is", devID)
  117. replCert := cert
  118. if replCertFile != "" && replKeyFile != "" {
  119. replCert, err = tls.LoadX509KeyPair(replCertFile, replKeyFile)
  120. if err != nil {
  121. log.Fatalln("Failed to load replication keypair:", err)
  122. }
  123. }
  124. replDevID := protocol.NewDeviceID(replCert.Certificate[0])
  125. log.Println("Replication device ID is", replDevID)
  126. // Parse the replication specs, if any.
  127. var allowedReplicationPeers []protocol.DeviceID
  128. var replicationDestinations []string
  129. parts := strings.Split(replicationPeers, ",")
  130. for _, part := range parts {
  131. if part == "" {
  132. continue
  133. }
  134. fields := strings.Split(part, "@")
  135. switch len(fields) {
  136. case 2:
  137. // This is an id@address specification. Grab the address for the
  138. // destination list. Try to resolve it once to catch obvious
  139. // syntax errors here rather than having the sender service fail
  140. // repeatedly later.
  141. _, err := net.ResolveTCPAddr("tcp", fields[1])
  142. if err != nil {
  143. log.Fatalln("Resolving address:", err)
  144. }
  145. replicationDestinations = append(replicationDestinations, fields[1])
  146. fallthrough // N.B.
  147. case 1:
  148. // The first part is always a device ID.
  149. id, err := protocol.DeviceIDFromString(fields[0])
  150. if err != nil {
  151. log.Fatalln("Parsing device ID:", err)
  152. }
  153. if id == protocol.EmptyDeviceID {
  154. log.Fatalf("Missing device ID for peer in %q", part)
  155. }
  156. allowedReplicationPeers = append(allowedReplicationPeers, id)
  157. default:
  158. log.Fatalln("Unrecognized replication spec:", part)
  159. }
  160. }
  161. // Root of the service tree.
  162. main := suture.New("main", suture.Spec{
  163. PassThroughPanics: true,
  164. })
  165. // Start the database.
  166. db, err := newLevelDBStore(dir)
  167. if err != nil {
  168. log.Fatalln("Open database:", err)
  169. }
  170. main.Add(db)
  171. // Start any replication senders.
  172. var repl replicationMultiplexer
  173. for _, dst := range replicationDestinations {
  174. rs := newReplicationSender(dst, replCert, allowedReplicationPeers)
  175. main.Add(rs)
  176. repl = append(repl, rs)
  177. }
  178. // If we have replication configured, start the replication listener.
  179. if len(allowedReplicationPeers) > 0 {
  180. rl := newReplicationListener(replicationListen, replCert, allowedReplicationPeers, db)
  181. main.Add(rl)
  182. }
  183. // If we have an AMQP broker, start that
  184. if amqpAddress != "" {
  185. clientID := rand.String(10)
  186. kr := newAMQPReplicator(amqpAddress, clientID, db)
  187. repl = append(repl, kr)
  188. main.Add(kr)
  189. }
  190. go func() {
  191. for range time.NewTicker(time.Second).C {
  192. for _, r := range repl {
  193. r.send("<heartbeat>", nil, time.Now().UnixNano())
  194. }
  195. }
  196. }()
  197. // Start the main API server.
  198. qs := newAPISrv(listen, cert, db, repl, useHTTP, missesIncrease)
  199. main.Add(qs)
  200. // If we have a metrics port configured, start a metrics handler.
  201. if metricsListen != "" {
  202. go func() {
  203. mux := http.NewServeMux()
  204. mux.Handle("/metrics", promhttp.Handler())
  205. log.Fatal(http.ListenAndServe(metricsListen, mux))
  206. }()
  207. }
  208. // Engage!
  209. main.Serve(context.Background())
  210. }