| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- // Copyright (C) 2018 The Syncthing Authors.
- //
- // This Source Code Form is subject to the terms of the Mozilla Public
- // License, v. 2.0. If a copy of the MPL was not distributed with this file,
- // You can obtain one at https://mozilla.org/MPL/2.0/.
- package serve
- import (
- "bufio"
- "compress/gzip"
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "log/slog"
- "net"
- "net/http"
- _ "net/http/pprof"
- "os"
- "regexp"
- "strings"
- "time"
- "github.com/prometheus/client_golang/prometheus"
- "github.com/prometheus/client_golang/prometheus/promhttp"
- "github.com/puzpuzpuz/xsync/v3"
- "github.com/syncthing/syncthing/internal/blob"
- "github.com/syncthing/syncthing/internal/blob/azureblob"
- "github.com/syncthing/syncthing/internal/blob/s3"
- "github.com/syncthing/syncthing/internal/slogutil"
- "github.com/syncthing/syncthing/lib/build"
- "github.com/syncthing/syncthing/lib/geoip"
- "github.com/syncthing/syncthing/lib/ur/contract"
- )
- type CLI struct {
- Listen string `env:"UR_LISTEN" help:"Usage reporting & metrics endpoint listen address" default:"0.0.0.0:8080"`
- ListenInternal string `env:"UR_LISTEN_INTERNAL" help:"Internal metrics endpoint listen address" default:"0.0.0.0:8082"`
- GeoIPLicenseKey string `env:"UR_GEOIP_LICENSE_KEY"`
- GeoIPAccountID int `env:"UR_GEOIP_ACCOUNT_ID"`
- DumpFile string `env:"UR_DUMP_FILE" default:"reports.jsons.gz"`
- DumpInterval time.Duration `env:"UR_DUMP_INTERVAL" default:"5m"`
- S3Endpoint string `name:"s3-endpoint" env:"UR_S3_ENDPOINT"`
- S3Region string `name:"s3-region" env:"UR_S3_REGION"`
- S3Bucket string `name:"s3-bucket" env:"UR_S3_BUCKET"`
- S3AccessKeyID string `name:"s3-access-key-id" env:"UR_S3_ACCESS_KEY_ID"`
- S3SecretKey string `name:"s3-secret-key" env:"UR_S3_SECRET_KEY"`
- AzureBlobAccount string `name:"azure-blob-account" env:"UR_AZUREBLOB_ACCOUNT"`
- AzureBlobKey string `name:"azure-blob-key" env:"UR_AZUREBLOB_KEY"`
- AzureBlobContainer string `name:"azure-blob-container" env:"UR_AZUREBLOB_CONTAINER"`
- }
- var (
- compilerRe = regexp.MustCompile(`\(([A-Za-z0-9()., -]+) \w+-\w+(?:| android| default)\) ([\[email protected]]+)`)
- knownDistributions = []distributionMatch{
- // Maps well known builders to the official distribution method that
- // they represent
- {regexp.MustCompile(`\steamcity@build\.syncthing\.net`), "GitHub"},
- {regexp.MustCompile(`\sjenkins@build\.syncthing\.net`), "GitHub"},
- {regexp.MustCompile(`\sbuilder@github\.syncthing\.net`), "GitHub"},
- {regexp.MustCompile(`\sdeb@build\.syncthing\.net`), "APT"},
- {regexp.MustCompile(`\sdebian@github\.syncthing\.net`), "APT"},
- {regexp.MustCompile(`\sdocker@syncthing\.net`), "Docker Hub"},
- {regexp.MustCompile(`\[email protected]\.net`), "Docker Hub"},
- {regexp.MustCompile(`\[email protected]\.net`), "Docker Hub"},
- {regexp.MustCompile(`\sandroid-builder@github\.syncthing\.net`), "Google Play"},
- {regexp.MustCompile(`\sandroid-.*teamcity@build\.syncthing\.net`), "Google Play"},
- {regexp.MustCompile(`\sandroid-.*vagrant@basebox-stretch64`), "F-Droid"},
- {regexp.MustCompile(`\svagrant@bullseye`), "F-Droid"},
- {regexp.MustCompile(`\svagrant@bookworm`), "F-Droid"},
- {regexp.MustCompile(`Anwender@NET2017`), "Syncthing-Fork (3rd party)"},
- {regexp.MustCompile(`\sbuilduser@(archlinux|svetlemodry)`), "Arch (3rd party)"},
- {regexp.MustCompile(`\ssyncthing@archlinux`), "Arch (3rd party)"},
- {regexp.MustCompile(`@debian`), "Debian (3rd party)"},
- {regexp.MustCompile(`@fedora`), "Fedora (3rd party)"},
- {regexp.MustCompile(`@openSUSE`), "openSUSE (3rd party)"},
- {regexp.MustCompile(`\sbrew@`), "Homebrew (3rd party)"},
- {regexp.MustCompile(`\sroot@buildkitsandbox`), "LinuxServer.io (3rd party)"},
- {regexp.MustCompile(`\sports@freebsd`), "FreeBSD (3rd party)"},
- {regexp.MustCompile(`\snix@nix`), "Nix (3rd party)"},
- {regexp.MustCompile(`.`), "Others"},
- }
- )
- type distributionMatch struct {
- matcher *regexp.Regexp
- distribution string
- }
- func (cli *CLI) Run() error {
- slog.Info("Starting", "version", build.Version)
- // Listening
- urListener, err := net.Listen("tcp", cli.Listen)
- if err != nil {
- slog.Error("Failed to listen (usage reports)", slogutil.Error(err))
- return err
- }
- slog.Info("Listening (usage reports)", slogutil.Address(urListener.Addr()))
- internalListener, err := net.Listen("tcp", cli.ListenInternal)
- if err != nil {
- slog.Error("Failed to listen (internal)", slogutil.Error(err))
- return err
- }
- slog.Info("Listening (internal)", slogutil.Address(internalListener.Addr()))
- var geo *geoip.Provider
- if cli.GeoIPAccountID != 0 && cli.GeoIPLicenseKey != "" {
- geo, err = geoip.NewGeoLite2CityProvider(context.Background(), cli.GeoIPAccountID, cli.GeoIPLicenseKey, os.TempDir())
- if err != nil {
- slog.Error("Failed to load GeoIP", slogutil.Error(err))
- return err
- }
- go geo.Serve(context.TODO())
- }
- // Blob storage
- var blobs blob.Store
- if cli.S3Endpoint != "" {
- blobs, err = s3.NewSession(cli.S3Endpoint, cli.S3Region, cli.S3Bucket, cli.S3AccessKeyID, cli.S3SecretKey)
- if err != nil {
- slog.Error("Failed to create S3 session", slogutil.Error(err))
- return err
- }
- } else if cli.AzureBlobAccount != "" {
- blobs, err = azureblob.NewBlobStore(cli.AzureBlobAccount, cli.AzureBlobKey, cli.AzureBlobContainer)
- if err != nil {
- slog.Error("Failed to create Azure blob store", slogutil.Error(err))
- return err
- }
- }
- if _, err := os.Stat(cli.DumpFile); err != nil && blobs != nil {
- if err := cli.downloadDumpFile(blobs); err != nil {
- slog.Error("Failed to download dump file", slogutil.Error(err))
- }
- }
- // server
- srv := &server{
- geo: geo,
- reports: xsync.NewMapOf[string, *contract.Report](),
- }
- if fd, err := os.Open(cli.DumpFile); err == nil {
- gr, err := gzip.NewReader(fd)
- if err == nil {
- srv.load(gr)
- }
- fd.Close()
- }
- go func() {
- for range time.Tick(cli.DumpInterval) {
- if err := cli.saveDumpFile(srv, blobs); err != nil {
- slog.Error("Failed to write dump file", slogutil.Error(err))
- }
- }
- }()
- // The internal metrics endpoint just serves metrics about what the
- // server is doing.
- http.Handle("/metrics", promhttp.Handler())
- internalSrv := http.Server{
- ReadTimeout: 5 * time.Second,
- WriteTimeout: 15 * time.Second,
- }
- go internalSrv.Serve(internalListener)
- // New external metrics endpoint accepts reports from clients and serves
- // aggregated usage reporting metrics.
- ms := newMetricsSet(srv)
- reg := prometheus.NewRegistry()
- reg.MustRegister(ms)
- mux := http.NewServeMux()
- mux.Handle("/metrics", promhttp.HandlerFor(reg, promhttp.HandlerOpts{}))
- mux.HandleFunc("/newdata", srv.handleNewData)
- mux.HandleFunc("/ping", srv.handlePing)
- metricsSrv := http.Server{
- ReadTimeout: 5 * time.Second,
- WriteTimeout: 15 * time.Second,
- Handler: mux,
- }
- slog.Info("Ready to serve")
- return metricsSrv.Serve(urListener)
- }
- func (cli *CLI) downloadDumpFile(blobs blob.Store) error {
- latestKey, err := blobs.LatestKey(context.Background())
- if err != nil {
- return fmt.Errorf("list latest S3 key: %w", err)
- }
- fd, err := os.Create(cli.DumpFile)
- if err != nil {
- return fmt.Errorf("create dump file: %w", err)
- }
- if err := blobs.Download(context.Background(), latestKey, fd); err != nil {
- _ = fd.Close()
- return fmt.Errorf("download dump file: %w", err)
- }
- if err := fd.Close(); err != nil {
- return fmt.Errorf("close dump file: %w", err)
- }
- slog.Info("Dump file downloaded", "key", latestKey)
- return nil
- }
- func (cli *CLI) saveDumpFile(srv *server, blobs blob.Store) error {
- fd, err := os.Create(cli.DumpFile + ".tmp")
- if err != nil {
- return fmt.Errorf("creating dump file: %w", err)
- }
- gw := gzip.NewWriter(fd)
- if err := srv.save(gw); err != nil {
- return fmt.Errorf("saving dump file: %w", err)
- }
- if err := gw.Close(); err != nil {
- fd.Close()
- return fmt.Errorf("closing gzip writer: %w", err)
- }
- if err := fd.Close(); err != nil {
- return fmt.Errorf("closing dump file: %w", err)
- }
- if err := os.Rename(cli.DumpFile+".tmp", cli.DumpFile); err != nil {
- return fmt.Errorf("renaming dump file: %w", err)
- }
- slog.Info("Dump file saved")
- if blobs != nil {
- key := fmt.Sprintf("reports-%s.jsons.gz", time.Now().UTC().Format("2006-01-02"))
- fd, err := os.Open(cli.DumpFile)
- if err != nil {
- return fmt.Errorf("opening dump file: %w", err)
- }
- if err := blobs.Upload(context.Background(), key, fd); err != nil {
- return fmt.Errorf("uploading dump file: %w", err)
- }
- _ = fd.Close()
- slog.Info("Dump file uploaded")
- }
- return nil
- }
- type server struct {
- geo *geoip.Provider
- reports *xsync.MapOf[string, *contract.Report]
- }
- func (s *server) handlePing(w http.ResponseWriter, r *http.Request) {
- }
- func (s *server) handleNewData(w http.ResponseWriter, r *http.Request) {
- result := "fail"
- defer func() {
- // result is "accept" (new report), "replace" (existing report) or
- // "fail"
- metricReportsTotal.WithLabelValues(result).Inc()
- }()
- defer r.Body.Close()
- if r.Method != http.MethodPost {
- http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
- return
- }
- addr := r.Header.Get("X-Forwarded-For")
- if addr != "" {
- addr = strings.Split(addr, ", ")[0]
- } else {
- addr = r.RemoteAddr
- }
- if host, _, err := net.SplitHostPort(addr); err == nil {
- addr = host
- }
- log := slog.With("addr", addr)
- if net.ParseIP(addr) == nil {
- addr = ""
- }
- var rep contract.Report
- lr := &io.LimitedReader{R: r.Body, N: 40 * 1024}
- bs, _ := io.ReadAll(lr)
- if err := json.Unmarshal(bs, &rep); err != nil {
- log.Error("Failed to decode JSON", slogutil.Error(err))
- http.Error(w, "JSON Decode Error", http.StatusInternalServerError)
- return
- }
- rep.Received = time.Now()
- rep.Date = rep.Received.UTC().Format("20060102")
- rep.Address = addr
- if err := rep.Validate(); err != nil {
- log.Error("Failed to validate report", slogutil.Error(err))
- http.Error(w, "Validation Error", http.StatusInternalServerError)
- return
- }
- if s.addReport(&rep) {
- result = "replace"
- } else {
- result = "accept"
- }
- }
- func (s *server) addReport(rep *contract.Report) bool {
- if s.geo != nil {
- if ip := net.ParseIP(rep.Address); ip != nil {
- if city, err := s.geo.City(ip); err == nil {
- rep.Country = city.Country.Names["en"]
- rep.CountryCode = city.Country.IsoCode
- }
- }
- }
- if rep.Country == "" {
- rep.Country = "Unknown"
- }
- if rep.CountryCode == "" {
- rep.CountryCode = "ZZ"
- }
- rep.Version = transformVersion(rep.Version)
- if strings.Contains(rep.Version, ".") {
- split := strings.SplitN(rep.Version, ".", 3)
- if len(split) == 3 {
- rep.MajorVersion = strings.Join(split[:2], ".")
- }
- }
- rep.OS, rep.Arch, _ = strings.Cut(rep.Platform, "-")
- if m := compilerRe.FindStringSubmatch(rep.LongVersion); len(m) == 3 {
- rep.Compiler = m[1]
- rep.Builder = m[2]
- }
- for _, d := range knownDistributions {
- if d.matcher.MatchString(rep.LongVersion) {
- rep.Distribution = d.distribution
- break
- }
- }
- rep.DistDist = rep.Distribution
- rep.DistOS = rep.OS
- rep.DistArch = rep.Arch
- _, loaded := s.reports.LoadAndStore(rep.UniqueID, rep)
- return loaded
- }
- func (s *server) save(w io.Writer) error {
- bw := bufio.NewWriter(w)
- enc := json.NewEncoder(bw)
- var err error
- s.reports.Range(func(k string, v *contract.Report) bool {
- err = enc.Encode(v)
- return err == nil
- })
- if err != nil {
- return err
- }
- return bw.Flush()
- }
- func (s *server) load(r io.Reader) {
- dec := json.NewDecoder(r)
- s.reports.Clear()
- for {
- var rep contract.Report
- if err := dec.Decode(&rep); errors.Is(err, io.EOF) {
- break
- } else if err != nil {
- slog.Error("Failed to load record", slogutil.Error(err))
- break
- }
- s.addReport(&rep)
- }
- slog.Info("Loaded reports", "count", s.reports.Size())
- }
- var (
- plusRe = regexp.MustCompile(`(\+.*|[.-]dev\..*)$`)
- plusStr = "-dev"
- )
- // transformVersion returns a version number formatted correctly, with all
- // development versions aggregated into one.
- func transformVersion(v string) string {
- if v == "unknown-dev" {
- return v
- }
- if !strings.HasPrefix(v, "v") {
- v = "v" + v
- }
- v = plusRe.ReplaceAllString(v, plusStr)
- return v
- }
|