serve.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. // Copyright (C) 2018 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package serve
  7. import (
  8. "bufio"
  9. "compress/gzip"
  10. "context"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "log/slog"
  16. "net"
  17. "net/http"
  18. _ "net/http/pprof"
  19. "os"
  20. "regexp"
  21. "strings"
  22. "time"
  23. "github.com/prometheus/client_golang/prometheus"
  24. "github.com/prometheus/client_golang/prometheus/promhttp"
  25. "github.com/puzpuzpuz/xsync/v3"
  26. "github.com/syncthing/syncthing/internal/blob"
  27. "github.com/syncthing/syncthing/internal/blob/azureblob"
  28. "github.com/syncthing/syncthing/internal/blob/s3"
  29. "github.com/syncthing/syncthing/internal/slogutil"
  30. "github.com/syncthing/syncthing/lib/build"
  31. "github.com/syncthing/syncthing/lib/geoip"
  32. "github.com/syncthing/syncthing/lib/ur/contract"
  33. )
  34. type CLI struct {
  35. Listen string `env:"UR_LISTEN" help:"Usage reporting & metrics endpoint listen address" default:"0.0.0.0:8080"`
  36. ListenInternal string `env:"UR_LISTEN_INTERNAL" help:"Internal metrics endpoint listen address" default:"0.0.0.0:8082"`
  37. GeoIPLicenseKey string `env:"UR_GEOIP_LICENSE_KEY"`
  38. GeoIPAccountID int `env:"UR_GEOIP_ACCOUNT_ID"`
  39. DumpFile string `env:"UR_DUMP_FILE" default:"reports.jsons.gz"`
  40. DumpInterval time.Duration `env:"UR_DUMP_INTERVAL" default:"5m"`
  41. S3Endpoint string `name:"s3-endpoint" env:"UR_S3_ENDPOINT"`
  42. S3Region string `name:"s3-region" env:"UR_S3_REGION"`
  43. S3Bucket string `name:"s3-bucket" env:"UR_S3_BUCKET"`
  44. S3AccessKeyID string `name:"s3-access-key-id" env:"UR_S3_ACCESS_KEY_ID"`
  45. S3SecretKey string `name:"s3-secret-key" env:"UR_S3_SECRET_KEY"`
  46. AzureBlobAccount string `name:"azure-blob-account" env:"UR_AZUREBLOB_ACCOUNT"`
  47. AzureBlobKey string `name:"azure-blob-key" env:"UR_AZUREBLOB_KEY"`
  48. AzureBlobContainer string `name:"azure-blob-container" env:"UR_AZUREBLOB_CONTAINER"`
  49. }
  50. var (
  51. compilerRe = regexp.MustCompile(`\(([A-Za-z0-9()., -]+) \w+-\w+(?:| android| default)\) ([\[email protected]]+)`)
  52. knownDistributions = []distributionMatch{
  53. // Maps well known builders to the official distribution method that
  54. // they represent
  55. {regexp.MustCompile(`\steamcity@build\.syncthing\.net`), "GitHub"},
  56. {regexp.MustCompile(`\sjenkins@build\.syncthing\.net`), "GitHub"},
  57. {regexp.MustCompile(`\sbuilder@github\.syncthing\.net`), "GitHub"},
  58. {regexp.MustCompile(`\sdeb@build\.syncthing\.net`), "APT"},
  59. {regexp.MustCompile(`\sdebian@github\.syncthing\.net`), "APT"},
  60. {regexp.MustCompile(`\sdocker@syncthing\.net`), "Docker Hub"},
  61. {regexp.MustCompile(`\[email protected]\.net`), "Docker Hub"},
  62. {regexp.MustCompile(`\[email protected]\.net`), "Docker Hub"},
  63. {regexp.MustCompile(`\sandroid-builder@github\.syncthing\.net`), "Google Play"},
  64. {regexp.MustCompile(`\sandroid-.*teamcity@build\.syncthing\.net`), "Google Play"},
  65. {regexp.MustCompile(`\sandroid-.*vagrant@basebox-stretch64`), "F-Droid"},
  66. {regexp.MustCompile(`\svagrant@bullseye`), "F-Droid"},
  67. {regexp.MustCompile(`\svagrant@bookworm`), "F-Droid"},
  68. {regexp.MustCompile(`Anwender@NET2017`), "Syncthing-Fork (3rd party)"},
  69. {regexp.MustCompile(`\sbuilduser@(archlinux|svetlemodry)`), "Arch (3rd party)"},
  70. {regexp.MustCompile(`\ssyncthing@archlinux`), "Arch (3rd party)"},
  71. {regexp.MustCompile(`@debian`), "Debian (3rd party)"},
  72. {regexp.MustCompile(`@fedora`), "Fedora (3rd party)"},
  73. {regexp.MustCompile(`@openSUSE`), "openSUSE (3rd party)"},
  74. {regexp.MustCompile(`\sbrew@`), "Homebrew (3rd party)"},
  75. {regexp.MustCompile(`\sroot@buildkitsandbox`), "LinuxServer.io (3rd party)"},
  76. {regexp.MustCompile(`\sports@freebsd`), "FreeBSD (3rd party)"},
  77. {regexp.MustCompile(`\snix@nix`), "Nix (3rd party)"},
  78. {regexp.MustCompile(`.`), "Others"},
  79. }
  80. )
  81. type distributionMatch struct {
  82. matcher *regexp.Regexp
  83. distribution string
  84. }
  85. func (cli *CLI) Run() error {
  86. slog.Info("Starting", "version", build.Version)
  87. // Listening
  88. urListener, err := net.Listen("tcp", cli.Listen)
  89. if err != nil {
  90. slog.Error("Failed to listen (usage reports)", slogutil.Error(err))
  91. return err
  92. }
  93. slog.Info("Listening (usage reports)", slogutil.Address(urListener.Addr()))
  94. internalListener, err := net.Listen("tcp", cli.ListenInternal)
  95. if err != nil {
  96. slog.Error("Failed to listen (internal)", slogutil.Error(err))
  97. return err
  98. }
  99. slog.Info("Listening (internal)", slogutil.Address(internalListener.Addr()))
  100. var geo *geoip.Provider
  101. if cli.GeoIPAccountID != 0 && cli.GeoIPLicenseKey != "" {
  102. geo, err = geoip.NewGeoLite2CityProvider(context.Background(), cli.GeoIPAccountID, cli.GeoIPLicenseKey, os.TempDir())
  103. if err != nil {
  104. slog.Error("Failed to load GeoIP", slogutil.Error(err))
  105. return err
  106. }
  107. go geo.Serve(context.TODO())
  108. }
  109. // Blob storage
  110. var blobs blob.Store
  111. if cli.S3Endpoint != "" {
  112. blobs, err = s3.NewSession(cli.S3Endpoint, cli.S3Region, cli.S3Bucket, cli.S3AccessKeyID, cli.S3SecretKey)
  113. if err != nil {
  114. slog.Error("Failed to create S3 session", slogutil.Error(err))
  115. return err
  116. }
  117. } else if cli.AzureBlobAccount != "" {
  118. blobs, err = azureblob.NewBlobStore(cli.AzureBlobAccount, cli.AzureBlobKey, cli.AzureBlobContainer)
  119. if err != nil {
  120. slog.Error("Failed to create Azure blob store", slogutil.Error(err))
  121. return err
  122. }
  123. }
  124. if _, err := os.Stat(cli.DumpFile); err != nil && blobs != nil {
  125. if err := cli.downloadDumpFile(blobs); err != nil {
  126. slog.Error("Failed to download dump file", slogutil.Error(err))
  127. }
  128. }
  129. // server
  130. srv := &server{
  131. geo: geo,
  132. reports: xsync.NewMapOf[string, *contract.Report](),
  133. }
  134. if fd, err := os.Open(cli.DumpFile); err == nil {
  135. gr, err := gzip.NewReader(fd)
  136. if err == nil {
  137. srv.load(gr)
  138. }
  139. fd.Close()
  140. }
  141. go func() {
  142. for range time.Tick(cli.DumpInterval) {
  143. if err := cli.saveDumpFile(srv, blobs); err != nil {
  144. slog.Error("Failed to write dump file", slogutil.Error(err))
  145. }
  146. }
  147. }()
  148. // The internal metrics endpoint just serves metrics about what the
  149. // server is doing.
  150. http.Handle("/metrics", promhttp.Handler())
  151. internalSrv := http.Server{
  152. ReadTimeout: 5 * time.Second,
  153. WriteTimeout: 15 * time.Second,
  154. }
  155. go internalSrv.Serve(internalListener)
  156. // New external metrics endpoint accepts reports from clients and serves
  157. // aggregated usage reporting metrics.
  158. ms := newMetricsSet(srv)
  159. reg := prometheus.NewRegistry()
  160. reg.MustRegister(ms)
  161. mux := http.NewServeMux()
  162. mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
  163. mux.HandleFunc("/newdata", srv.handleNewData)
  164. mux.HandleFunc("/ping", srv.handlePing)
  165. metricsSrv := http.Server{
  166. ReadTimeout: 5 * time.Second,
  167. WriteTimeout: 15 * time.Second,
  168. Handler: mux,
  169. }
  170. slog.Info("Ready to serve")
  171. return metricsSrv.Serve(urListener)
  172. }
  173. func (cli *CLI) downloadDumpFile(blobs blob.Store) error {
  174. latestKey, err := blobs.LatestKey(context.Background())
  175. if err != nil {
  176. return fmt.Errorf("list latest S3 key: %w", err)
  177. }
  178. fd, err := os.Create(cli.DumpFile)
  179. if err != nil {
  180. return fmt.Errorf("create dump file: %w", err)
  181. }
  182. if err := blobs.Download(context.Background(), latestKey, fd); err != nil {
  183. _ = fd.Close()
  184. return fmt.Errorf("download dump file: %w", err)
  185. }
  186. if err := fd.Close(); err != nil {
  187. return fmt.Errorf("close dump file: %w", err)
  188. }
  189. slog.Info("Dump file downloaded", "key", latestKey)
  190. return nil
  191. }
  192. func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error {
  193. fd, err := os.Create(cli.DumpFile + ".tmp")
  194. if err != nil {
  195. return fmt.Errorf("creating dump file: %w", err)
  196. }
  197. gw := gzip.NewWriter(fd)
  198. if err := srv.save(gw); err != nil {
  199. return fmt.Errorf("saving dump file: %w", err)
  200. }
  201. if err := gw.Close(); err != nil {
  202. fd.Close()
  203. return fmt.Errorf("closing gzip writer: %w", err)
  204. }
  205. if err := fd.Close(); err != nil {
  206. return fmt.Errorf("closing dump file: %w", err)
  207. }
  208. if err := os.Rename(cli.DumpFile+".tmp", cli.DumpFile); err != nil {
  209. return fmt.Errorf("renaming dump file: %w", err)
  210. }
  211. slog.Info("Dump file saved")
  212. if blobs != nil {
  213. key := fmt.Sprintf("reports-%s.jsons.gz", time.Now().UTC().Format("2006-01-02"))
  214. fd, err := os.Open(cli.DumpFile)
  215. if err != nil {
  216. return fmt.Errorf("opening dump file: %w", err)
  217. }
  218. if err := blobs.Upload(context.Background(), key, fd); err != nil {
  219. return fmt.Errorf("uploading dump file: %w", err)
  220. }
  221. _ = fd.Close()
  222. slog.Info("Dump file uploaded")
  223. }
  224. return nil
  225. }
  226. type server struct {
  227. geo *geoip.Provider
  228. reports *xsync.MapOf[string, *contract.Report]
  229. }
  230. func (s *server) handlePing(w http.ResponseWriter, r *http.Request) {
  231. }
  232. func (s *server) handleNewData(w http.ResponseWriter, r *http.Request) {
  233. result := "fail"
  234. defer func() {
  235. // result is "accept" (new report), "replace" (existing report) or
  236. // "fail"
  237. metricReportsTotal.WithLabelValues(result).Inc()
  238. }()
  239. defer r.Body.Close()
  240. if r.Method != http.MethodPost {
  241. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  242. return
  243. }
  244. addr := r.Header.Get("X-Forwarded-For")
  245. if addr != "" {
  246. addr = strings.Split(addr, ", ")[0]
  247. } else {
  248. addr = r.RemoteAddr
  249. }
  250. if host, _, err := net.SplitHostPort(addr); err == nil {
  251. addr = host
  252. }
  253. log := slog.With("addr", addr)
  254. if net.ParseIP(addr) == nil {
  255. addr = ""
  256. }
  257. var rep contract.Report
  258. lr := &io.LimitedReader{R: r.Body, N: 40 * 1024}
  259. bs, _ := io.ReadAll(lr)
  260. if err := json.Unmarshal(bs, &rep); err != nil {
  261. log.Error("Failed to decode JSON", slogutil.Error(err))
  262. http.Error(w, "JSON Decode Error", http.StatusInternalServerError)
  263. return
  264. }
  265. rep.Received = time.Now()
  266. rep.Date = rep.Received.UTC().Format("20060102")
  267. rep.Address = addr
  268. if err := rep.Validate(); err != nil {
  269. log.Error("Failed to validate report", slogutil.Error(err))
  270. http.Error(w, "Validation Error", http.StatusInternalServerError)
  271. return
  272. }
  273. if s.addReport(&rep) {
  274. result = "replace"
  275. } else {
  276. result = "accept"
  277. }
  278. }
  279. func (s *server) addReport(rep *contract.Report) bool {
  280. if s.geo != nil {
  281. if ip := net.ParseIP(rep.Address); ip != nil {
  282. if city, err := s.geo.City(ip); err == nil {
  283. rep.Country = city.Country.Names["en"]
  284. rep.CountryCode = city.Country.IsoCode
  285. }
  286. }
  287. }
  288. if rep.Country == "" {
  289. rep.Country = "Unknown"
  290. }
  291. if rep.CountryCode == "" {
  292. rep.CountryCode = "ZZ"
  293. }
  294. rep.Version = transformVersion(rep.Version)
  295. if strings.Contains(rep.Version, ".") {
  296. split := strings.SplitN(rep.Version, ".", 3)
  297. if len(split) == 3 {
  298. rep.MajorVersion = strings.Join(split[:2], ".")
  299. }
  300. }
  301. rep.OS, rep.Arch, _ = strings.Cut(rep.Platform, "-")
  302. if m := compilerRe.FindStringSubmatch(rep.LongVersion); len(m) == 3 {
  303. rep.Compiler = m[1]
  304. rep.Builder = m[2]
  305. }
  306. for _, d := range knownDistributions {
  307. if d.matcher.MatchString(rep.LongVersion) {
  308. rep.Distribution = d.distribution
  309. break
  310. }
  311. }
  312. rep.DistDist = rep.Distribution
  313. rep.DistOS = rep.OS
  314. rep.DistArch = rep.Arch
  315. _, loaded := s.reports.LoadAndStore(rep.UniqueID, rep)
  316. return loaded
  317. }
  318. func (s *server) save(w io.Writer) error {
  319. bw := bufio.NewWriter(w)
  320. enc := json.NewEncoder(bw)
  321. var err error
  322. s.reports.Range(func(k string, v *contract.Report) bool {
  323. err = enc.Encode(v)
  324. return err == nil
  325. })
  326. if err != nil {
  327. return err
  328. }
  329. return bw.Flush()
  330. }
  331. func (s *server) load(r io.Reader) {
  332. dec := json.NewDecoder(r)
  333. s.reports.Clear()
  334. for {
  335. var rep contract.Report
  336. if err := dec.Decode(&rep); errors.Is(err, io.EOF) {
  337. break
  338. } else if err != nil {
  339. slog.Error("Failed to load record", slogutil.Error(err))
  340. break
  341. }
  342. s.addReport(&rep)
  343. }
  344. slog.Info("Loaded reports", "count", s.reports.Size())
  345. }
  346. var (
  347. plusRe = regexp.MustCompile(`(\+.*|[.-]dev\..*)$`)
  348. plusStr = "-dev"
  349. )
  350. // transformVersion returns a version number formatted correctly, with all
  351. // development versions aggregated into one.
  352. func transformVersion(v string) string {
  353. if v == "unknown-dev" {
  354. return v
  355. }
  356. if !strings.HasPrefix(v, "v") {
  357. v = "v" + v
  358. }
  359. v = plusRe.ReplaceAllString(v, plusStr)
  360. return v
  361. }