global.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "context"
  10. "crypto/tls"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "log/slog"
  16. "net"
  17. "net/http"
  18. "net/url"
  19. "strconv"
  20. "sync"
  21. "time"
  22. "golang.org/x/net/http2"
  23. "github.com/syncthing/syncthing/internal/slogutil"
  24. "github.com/syncthing/syncthing/lib/connections/registry"
  25. "github.com/syncthing/syncthing/lib/dialer"
  26. "github.com/syncthing/syncthing/lib/events"
  27. "github.com/syncthing/syncthing/lib/protocol"
  28. )
  29. type globalClient struct {
  30. server string
  31. addrList AddressLister
  32. announceClient httpClient
  33. queryClient httpClient
  34. noAnnounce bool
  35. noLookup bool
  36. evLogger events.Logger
  37. errorHolder
  38. }
  39. type httpClient interface {
  40. Get(ctx context.Context, url string) (*http.Response, error)
  41. Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error)
  42. }
  43. const (
  44. defaultReannounceInterval = 30 * time.Minute
  45. announceErrorRetryInterval = 5 * time.Minute
  46. requestTimeout = 30 * time.Second
  47. maxAddressChangesBetweenAnnouncements = 10
  48. )
  49. type announcement struct {
  50. Addresses []string `json:"addresses"`
  51. }
  52. func (a announcement) MarshalJSON() ([]byte, error) {
  53. type announcementCopy announcement
  54. a.Addresses = sanitizeRelayAddresses(a.Addresses)
  55. aCopy := announcementCopy(a)
  56. return json.Marshal(aCopy)
  57. }
  58. type serverOptions struct {
  59. insecure bool // don't check certificate
  60. noAnnounce bool // don't announce
  61. noLookup bool // don't use for lookups
  62. id string // expected server device ID
  63. }
  64. // A lookupError is any other error but with a cache validity time attached.
  65. type lookupError struct {
  66. msg string
  67. cacheFor time.Duration
  68. }
  69. func (e *lookupError) Error() string { return e.msg }
  70. func (e *lookupError) CacheFor() time.Duration {
  71. return e.cacheFor
  72. }
  73. func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLogger events.Logger, registry *registry.Registry) (FinderService, error) {
  74. server, opts, err := parseOptions(server)
  75. if err != nil {
  76. return nil, err
  77. }
  78. var devID protocol.DeviceID
  79. if opts.id != "" {
  80. devID, err = protocol.DeviceIDFromString(opts.id)
  81. if err != nil {
  82. return nil, err
  83. }
  84. }
  85. // The http.Client used for announcements. It needs to have our
  86. // certificate to prove our identity, and may or may not verify the server
  87. // certificate depending on the insecure setting.
  88. var dialContext func(ctx context.Context, network, addr string) (net.Conn, error)
  89. if registry != nil {
  90. dialContext = dialer.DialContextReusePortFunc(registry)
  91. } else {
  92. dialContext = dialer.DialContext
  93. }
  94. var announceClient httpClient = &contextClient{&http.Client{
  95. Timeout: requestTimeout,
  96. Transport: http2EnabledTransport(&http.Transport{
  97. DialContext: dialContext,
  98. Proxy: http.ProxyFromEnvironment,
  99. DisableKeepAlives: true, // announcements are few and far between, so don't keep the connection open
  100. TLSClientConfig: &tls.Config{
  101. InsecureSkipVerify: opts.insecure,
  102. Certificates: []tls.Certificate{cert},
  103. MinVersion: tls.VersionTLS12,
  104. ClientSessionCache: tls.NewLRUClientSessionCache(0),
  105. },
  106. }),
  107. }}
  108. if opts.id != "" {
  109. announceClient = newIDCheckingHTTPClient(announceClient, devID)
  110. }
  111. // The http.Client used for queries. We don't need to present our
  112. // certificate here, so lets not include it. May be insecure if requested.
  113. var queryClient httpClient = &contextClient{&http.Client{
  114. Timeout: requestTimeout,
  115. Transport: http2EnabledTransport(&http.Transport{
  116. DialContext: dialer.DialContext,
  117. Proxy: http.ProxyFromEnvironment,
  118. IdleConnTimeout: time.Second,
  119. TLSClientConfig: &tls.Config{
  120. InsecureSkipVerify: opts.insecure,
  121. MinVersion: tls.VersionTLS12,
  122. ClientSessionCache: tls.NewLRUClientSessionCache(0),
  123. },
  124. }),
  125. }}
  126. if opts.id != "" {
  127. queryClient = newIDCheckingHTTPClient(queryClient, devID)
  128. }
  129. cl := &globalClient{
  130. server: server,
  131. addrList: addrList,
  132. announceClient: announceClient,
  133. queryClient: queryClient,
  134. noAnnounce: opts.noAnnounce,
  135. noLookup: opts.noLookup,
  136. evLogger: evLogger,
  137. }
  138. if !opts.noAnnounce {
  139. // If we are supposed to announce, it's an error until we've done so.
  140. cl.setError(errors.New("not announced"))
  141. }
  142. return cl, nil
  143. }
  144. // Lookup returns the list of addresses where the given device is available
  145. func (c *globalClient) Lookup(ctx context.Context, device protocol.DeviceID) (addresses []string, err error) {
  146. if c.noLookup {
  147. return nil, &lookupError{
  148. msg: "lookups not supported",
  149. cacheFor: time.Hour,
  150. }
  151. }
  152. qURL, err := url.Parse(c.server)
  153. if err != nil {
  154. return nil, err
  155. }
  156. q := qURL.Query()
  157. q.Set("device", device.String())
  158. qURL.RawQuery = q.Encode()
  159. resp, err := c.queryClient.Get(ctx, qURL.String())
  160. if err != nil {
  161. slog.DebugContext(ctx, "globalClient.Lookup", "url", qURL, slogutil.Error(err))
  162. return nil, err
  163. }
  164. if resp.StatusCode != http.StatusOK {
  165. resp.Body.Close()
  166. slog.DebugContext(ctx, "globalClient.Lookup", "url", qURL, "status", resp.Status)
  167. err := errors.New(resp.Status)
  168. if secs, atoiErr := strconv.Atoi(resp.Header.Get("Retry-After")); atoiErr == nil && secs > 0 {
  169. err = &lookupError{
  170. msg: resp.Status,
  171. cacheFor: time.Duration(secs) * time.Second,
  172. }
  173. }
  174. return nil, err
  175. }
  176. bs, err := io.ReadAll(resp.Body)
  177. if err != nil {
  178. return nil, err
  179. }
  180. resp.Body.Close()
  181. var ann announcement
  182. err = json.Unmarshal(bs, &ann)
  183. return ann.Addresses, err
  184. }
  185. func (c *globalClient) String() string {
  186. return "global@" + c.server
  187. }
  188. func (c *globalClient) Serve(ctx context.Context) error {
  189. if c.noAnnounce {
  190. // We're configured to not do announcements, only lookups. To maintain
  191. // the same interface, we just pause here if Serve() is run.
  192. <-ctx.Done()
  193. return ctx.Err()
  194. }
  195. timer := time.NewTimer(5 * time.Second)
  196. defer timer.Stop()
  197. eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged)
  198. defer eventSub.Unsubscribe()
  199. timerResetCount := 0
  200. for {
  201. select {
  202. case <-eventSub.C():
  203. if timerResetCount < maxAddressChangesBetweenAnnouncements {
  204. // Defer announcement by 2 seconds, essentially debouncing
  205. // if we have a stream of events incoming in quick succession.
  206. timer.Reset(2 * time.Second)
  207. } else if timerResetCount == maxAddressChangesBetweenAnnouncements {
  208. // Yet only do it if we haven't had to reset maxAddressChangesBetweenAnnouncements times in a row,
  209. // so if something is flip-flopping within 2 seconds, we don't end up in a permanent reset loop.
  210. slog.ErrorContext(ctx, "Detected a flip-flopping listener", slog.String("server", c.server))
  211. c.setError(errors.New("flip flopping listener"))
  212. // Incrementing the count above 10 will prevent us from warning or setting the error again
  213. // It will also suppress event based resets until we've had a proper round after announceErrorRetryInterval
  214. timer.Reset(announceErrorRetryInterval)
  215. }
  216. timerResetCount++
  217. case <-timer.C:
  218. timerResetCount = 0
  219. c.sendAnnouncement(ctx, timer)
  220. case <-ctx.Done():
  221. return ctx.Err()
  222. }
  223. }
  224. }
  225. func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer) {
  226. var ann announcement
  227. if c.addrList != nil {
  228. ann.Addresses = c.addrList.ExternalAddresses()
  229. }
  230. if len(ann.Addresses) == 0 {
  231. // There are legitimate cases for not having anything to announce,
  232. // yet still using global discovery for lookups. Do not error out
  233. // here.
  234. c.setError(nil)
  235. timer.Reset(announceErrorRetryInterval)
  236. return
  237. }
  238. // The marshal doesn't fail, I promise.
  239. postData, _ := json.Marshal(ann)
  240. slog.DebugContext(ctx, "send announcement", "server", c.server, "announcement", ann)
  241. resp, err := c.announceClient.Post(ctx, c.server, "application/json", bytes.NewReader(postData))
  242. if err != nil {
  243. slog.DebugContext(ctx, "announce POST", "server", c.server, slogutil.Error(err))
  244. c.setError(err)
  245. timer.Reset(announceErrorRetryInterval)
  246. return
  247. }
  248. slog.DebugContext(ctx, "announce POST", "server", c.server, "status", resp.Status)
  249. resp.Body.Close()
  250. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  251. slog.DebugContext(ctx, "announce POST", "server", c.server, "status", resp.Status)
  252. c.setError(errors.New(resp.Status))
  253. if h := resp.Header.Get("Retry-After"); h != "" {
  254. // The server has a recommendation on when we should
  255. // retry. Follow it.
  256. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  257. slog.DebugContext(ctx, "server sets retry-after", "server", c.server, "seconds", secs)
  258. timer.Reset(time.Duration(secs) * time.Second)
  259. return
  260. }
  261. }
  262. timer.Reset(announceErrorRetryInterval)
  263. return
  264. }
  265. c.setError(nil)
  266. if h := resp.Header.Get("Reannounce-After"); h != "" {
  267. // The server has a recommendation on when we should
  268. // reannounce. Follow it.
  269. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  270. slog.DebugContext(ctx, "announce sets reannounce-after", "server", c.server, "seconds", secs)
  271. timer.Reset(time.Duration(secs) * time.Second)
  272. return
  273. }
  274. }
  275. timer.Reset(defaultReannounceInterval)
  276. }
  277. func (*globalClient) Cache() map[protocol.DeviceID]CacheEntry {
  278. // The globalClient doesn't do caching
  279. return nil
  280. }
  281. // parseOptions parses and strips away any ?query=val options, setting the
  282. // corresponding field in the serverOptions struct. Unknown query options are
  283. // ignored and removed.
  284. func parseOptions(dsn string) (server string, opts serverOptions, err error) {
  285. p, err := url.Parse(dsn)
  286. if err != nil {
  287. return "", serverOptions{}, err
  288. }
  289. // Grab known options from the query string
  290. q := p.Query()
  291. opts.id = q.Get("id")
  292. opts.insecure = opts.id != "" || queryBool(q, "insecure")
  293. opts.noAnnounce = queryBool(q, "noannounce")
  294. opts.noLookup = queryBool(q, "nolookup")
  295. // Check for disallowed combinations
  296. if p.Scheme == "http" {
  297. if !opts.insecure {
  298. return "", serverOptions{}, errors.New("http without insecure not supported")
  299. }
  300. if !opts.noAnnounce {
  301. return "", serverOptions{}, errors.New("http without noannounce not supported")
  302. }
  303. } else if p.Scheme != "https" {
  304. return "", serverOptions{}, errors.New("unsupported scheme " + p.Scheme)
  305. }
  306. // Remove the query string
  307. p.RawQuery = ""
  308. server = p.String()
  309. return
  310. }
  311. // queryBool returns the query parameter parsed as a boolean. An empty value
  312. // ("?foo") is considered true, as is any value string except false
  313. // ("?foo=false").
  314. func queryBool(q url.Values, key string) bool {
  315. if _, ok := q[key]; !ok {
  316. return false
  317. }
  318. return q.Get(key) != "false"
  319. }
  320. type idCheckingHTTPClient struct {
  321. httpClient
  322. id protocol.DeviceID
  323. }
  324. func newIDCheckingHTTPClient(client httpClient, id protocol.DeviceID) *idCheckingHTTPClient {
  325. return &idCheckingHTTPClient{
  326. httpClient: client,
  327. id: id,
  328. }
  329. }
  330. func (c *idCheckingHTTPClient) check(resp *http.Response) error {
  331. if resp.TLS == nil {
  332. return errors.New("security: not TLS")
  333. }
  334. if len(resp.TLS.PeerCertificates) == 0 {
  335. return errors.New("security: no certificates")
  336. }
  337. id := protocol.NewDeviceID(resp.TLS.PeerCertificates[0].Raw)
  338. if !id.Equals(c.id) {
  339. return errors.New("security: incorrect device id")
  340. }
  341. return nil
  342. }
  343. func (c *idCheckingHTTPClient) Get(ctx context.Context, url string) (*http.Response, error) {
  344. resp, err := c.httpClient.Get(ctx, url)
  345. if err != nil {
  346. return nil, err
  347. }
  348. if err := c.check(resp); err != nil {
  349. return nil, err
  350. }
  351. return resp, nil
  352. }
  353. func (c *idCheckingHTTPClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  354. resp, err := c.httpClient.Post(ctx, url, ctype, data)
  355. if err != nil {
  356. return nil, err
  357. }
  358. if err := c.check(resp); err != nil {
  359. return nil, err
  360. }
  361. return resp, nil
  362. }
  363. type errorHolder struct {
  364. err error
  365. mut sync.Mutex // uses stdlib sync as I want this to be trivially embeddable, and there is no risk of blocking
  366. }
  367. func (e *errorHolder) setError(err error) {
  368. e.mut.Lock()
  369. e.err = err
  370. e.mut.Unlock()
  371. }
  372. func (e *errorHolder) Error() error {
  373. e.mut.Lock()
  374. err := e.err
  375. e.mut.Unlock()
  376. return err
  377. }
  378. type contextClient struct {
  379. *http.Client
  380. }
  381. func (c *contextClient) Get(ctx context.Context, url string) (*http.Response, error) {
  382. req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
  383. if err != nil {
  384. return nil, err
  385. }
  386. return c.Client.Do(req)
  387. }
  388. func (c *contextClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  389. req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, data)
  390. if err != nil {
  391. return nil, err
  392. }
  393. req.Header.Set("Content-Type", ctype)
  394. return c.Client.Do(req)
  395. }
  396. func globalDiscoveryIdentity(addr string) string {
  397. return "global discovery server " + addr
  398. }
  399. func ipv4Identity(port int) string {
  400. return fmt.Sprintf("IPv4 local broadcast discovery on port %d", port)
  401. }
  402. func ipv6Identity(addr string) string {
  403. return fmt.Sprintf("IPv6 local multicast discovery on address %s", addr)
  404. }
  405. func http2EnabledTransport(t *http.Transport) *http.Transport {
  406. _ = http2.ConfigureTransport(t)
  407. return t
  408. }