global.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "context"
  10. "crypto/tls"
  11. "encoding/json"
  12. "errors"
  13. "fmt"
  14. "io"
  15. "io/ioutil"
  16. "net/http"
  17. "net/url"
  18. "strconv"
  19. stdsync "sync"
  20. "time"
  21. "github.com/thejerf/suture"
  22. "github.com/syncthing/syncthing/lib/dialer"
  23. "github.com/syncthing/syncthing/lib/events"
  24. "github.com/syncthing/syncthing/lib/protocol"
  25. "github.com/syncthing/syncthing/lib/util"
  26. )
  27. type globalClient struct {
  28. suture.Service
  29. server string
  30. addrList AddressLister
  31. announceClient httpClient
  32. queryClient httpClient
  33. noAnnounce bool
  34. noLookup bool
  35. evLogger events.Logger
  36. errorHolder
  37. }
  38. type httpClient interface {
  39. Get(ctx context.Context, url string) (*http.Response, error)
  40. Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error)
  41. }
  42. const (
  43. defaultReannounceInterval = 30 * time.Minute
  44. announceErrorRetryInterval = 5 * time.Minute
  45. requestTimeout = 5 * time.Second
  46. maxAddressChangesBetweenAnnouncements = 10
  47. )
  48. type announcement struct {
  49. Addresses []string `json:"addresses"`
  50. }
  51. type serverOptions struct {
  52. insecure bool // don't check certificate
  53. noAnnounce bool // don't announce
  54. noLookup bool // don't use for lookups
  55. id string // expected server device ID
  56. }
  57. // A lookupError is any other error but with a cache validity time attached.
  58. type lookupError struct {
  59. msg string
  60. cacheFor time.Duration
  61. }
  62. func (e *lookupError) Error() string { return e.msg }
  63. func (e *lookupError) CacheFor() time.Duration {
  64. return e.cacheFor
  65. }
  66. func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
  67. server, opts, err := parseOptions(server)
  68. if err != nil {
  69. return nil, err
  70. }
  71. var devID protocol.DeviceID
  72. if opts.id != "" {
  73. devID, err = protocol.DeviceIDFromString(opts.id)
  74. if err != nil {
  75. return nil, err
  76. }
  77. }
  78. // The http.Client used for announcements. It needs to have our
  79. // certificate to prove our identity, and may or may not verify the server
  80. // certificate depending on the insecure setting.
  81. var announceClient httpClient = &contextClient{&http.Client{
  82. Timeout: requestTimeout,
  83. Transport: &http.Transport{
  84. DialContext: dialer.DialContextReusePort,
  85. Proxy: http.ProxyFromEnvironment,
  86. TLSClientConfig: &tls.Config{
  87. InsecureSkipVerify: opts.insecure,
  88. Certificates: []tls.Certificate{cert},
  89. },
  90. },
  91. }}
  92. if opts.id != "" {
  93. announceClient = newIDCheckingHTTPClient(announceClient, devID)
  94. }
  95. // The http.Client used for queries. We don't need to present our
  96. // certificate here, so lets not include it. May be insecure if requested.
  97. var queryClient httpClient = &contextClient{&http.Client{
  98. Timeout: requestTimeout,
  99. Transport: &http.Transport{
  100. DialContext: dialer.DialContext,
  101. Proxy: http.ProxyFromEnvironment,
  102. TLSClientConfig: &tls.Config{
  103. InsecureSkipVerify: opts.insecure,
  104. },
  105. },
  106. }}
  107. if opts.id != "" {
  108. queryClient = newIDCheckingHTTPClient(queryClient, devID)
  109. }
  110. cl := &globalClient{
  111. server: server,
  112. addrList: addrList,
  113. announceClient: announceClient,
  114. queryClient: queryClient,
  115. noAnnounce: opts.noAnnounce,
  116. noLookup: opts.noLookup,
  117. evLogger: evLogger,
  118. }
  119. cl.Service = util.AsService(cl.serve, cl.String())
  120. if !opts.noAnnounce {
  121. // If we are supposed to annonce, it's an error until we've done so.
  122. cl.setError(errors.New("not announced"))
  123. }
  124. return cl, nil
  125. }
  126. // Lookup returns the list of addresses where the given device is available
  127. func (c *globalClient) Lookup(ctx context.Context, device protocol.DeviceID) (addresses []string, err error) {
  128. if c.noLookup {
  129. return nil, &lookupError{
  130. msg: "lookups not supported",
  131. cacheFor: time.Hour,
  132. }
  133. }
  134. qURL, err := url.Parse(c.server)
  135. if err != nil {
  136. return nil, err
  137. }
  138. q := qURL.Query()
  139. q.Set("device", device.String())
  140. qURL.RawQuery = q.Encode()
  141. resp, err := c.queryClient.Get(ctx, qURL.String())
  142. if err != nil {
  143. l.Debugln("globalClient.Lookup", qURL, err)
  144. return nil, err
  145. }
  146. if resp.StatusCode != 200 {
  147. resp.Body.Close()
  148. l.Debugln("globalClient.Lookup", qURL, resp.Status)
  149. err := errors.New(resp.Status)
  150. if secs, atoiErr := strconv.Atoi(resp.Header.Get("Retry-After")); atoiErr == nil && secs > 0 {
  151. err = &lookupError{
  152. msg: resp.Status,
  153. cacheFor: time.Duration(secs) * time.Second,
  154. }
  155. }
  156. return nil, err
  157. }
  158. bs, err := ioutil.ReadAll(resp.Body)
  159. if err != nil {
  160. return nil, err
  161. }
  162. resp.Body.Close()
  163. var ann announcement
  164. err = json.Unmarshal(bs, &ann)
  165. return ann.Addresses, err
  166. }
  167. func (c *globalClient) String() string {
  168. return "global@" + c.server
  169. }
  170. func (c *globalClient) serve(ctx context.Context) {
  171. if c.noAnnounce {
  172. // We're configured to not do announcements, only lookups. To maintain
  173. // the same interface, we just pause here if Serve() is run.
  174. <-ctx.Done()
  175. return
  176. }
  177. timer := time.NewTimer(5 * time.Second)
  178. defer timer.Stop()
  179. eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged)
  180. defer eventSub.Unsubscribe()
  181. timerResetCount := 0
  182. for {
  183. select {
  184. case <-eventSub.C():
  185. if timerResetCount < maxAddressChangesBetweenAnnouncements {
  186. // Defer announcement by 2 seconds, essentially debouncing
  187. // if we have a stream of events incoming in quick succession.
  188. timer.Reset(2 * time.Second)
  189. } else if timerResetCount == maxAddressChangesBetweenAnnouncements {
  190. // Yet only do it if we haven't had to reset maxAddressChangesBetweenAnnouncements times in a row,
  191. // so if something is flip-flopping within 2 seconds, we don't end up in a permanent reset loop.
  192. l.Warnf("Detected a flip-flopping listener")
  193. c.setError(errors.New("flip flopping listener"))
  194. // Incrementing the count above 10 will prevent us from warning or setting the error again
  195. // It will also suppress event based resets until we've had a proper round after announceErrorRetryInterval
  196. timer.Reset(announceErrorRetryInterval)
  197. }
  198. timerResetCount++
  199. case <-timer.C:
  200. timerResetCount = 0
  201. c.sendAnnouncement(ctx, timer)
  202. case <-ctx.Done():
  203. return
  204. }
  205. }
  206. }
  207. func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer) {
  208. var ann announcement
  209. if c.addrList != nil {
  210. ann.Addresses = c.addrList.ExternalAddresses()
  211. }
  212. if len(ann.Addresses) == 0 {
  213. // There are legitimate cases for not having anything to announce,
  214. // yet still using global discovery for lookups. Do not error out
  215. // here.
  216. c.setError(nil)
  217. timer.Reset(announceErrorRetryInterval)
  218. return
  219. }
  220. // The marshal doesn't fail, I promise.
  221. postData, _ := json.Marshal(ann)
  222. l.Debugf("Announcement: %v", ann)
  223. resp, err := c.announceClient.Post(ctx, c.server, "application/json", bytes.NewReader(postData))
  224. if err != nil {
  225. l.Debugln("announce POST:", err)
  226. c.setError(err)
  227. timer.Reset(announceErrorRetryInterval)
  228. return
  229. }
  230. l.Debugln("announce POST:", resp.Status)
  231. resp.Body.Close()
  232. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  233. l.Debugln("announce POST:", resp.Status)
  234. c.setError(errors.New(resp.Status))
  235. if h := resp.Header.Get("Retry-After"); h != "" {
  236. // The server has a recommendation on when we should
  237. // retry. Follow it.
  238. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  239. l.Debugln("announce Retry-After:", secs, err)
  240. timer.Reset(time.Duration(secs) * time.Second)
  241. return
  242. }
  243. }
  244. timer.Reset(announceErrorRetryInterval)
  245. return
  246. }
  247. c.setError(nil)
  248. if h := resp.Header.Get("Reannounce-After"); h != "" {
  249. // The server has a recommendation on when we should
  250. // reannounce. Follow it.
  251. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  252. l.Debugln("announce Reannounce-After:", secs, err)
  253. timer.Reset(time.Duration(secs) * time.Second)
  254. return
  255. }
  256. }
  257. timer.Reset(defaultReannounceInterval)
  258. }
  259. func (c *globalClient) Cache() map[protocol.DeviceID]CacheEntry {
  260. // The globalClient doesn't do caching
  261. return nil
  262. }
  263. // parseOptions parses and strips away any ?query=val options, setting the
  264. // corresponding field in the serverOptions struct. Unknown query options are
  265. // ignored and removed.
  266. func parseOptions(dsn string) (server string, opts serverOptions, err error) {
  267. p, err := url.Parse(dsn)
  268. if err != nil {
  269. return "", serverOptions{}, err
  270. }
  271. // Grab known options from the query string
  272. q := p.Query()
  273. opts.id = q.Get("id")
  274. opts.insecure = opts.id != "" || queryBool(q, "insecure")
  275. opts.noAnnounce = queryBool(q, "noannounce")
  276. opts.noLookup = queryBool(q, "nolookup")
  277. // Check for disallowed combinations
  278. if p.Scheme == "http" {
  279. if !opts.insecure {
  280. return "", serverOptions{}, errors.New("http without insecure not supported")
  281. }
  282. if !opts.noAnnounce {
  283. return "", serverOptions{}, errors.New("http without noannounce not supported")
  284. }
  285. } else if p.Scheme != "https" {
  286. return "", serverOptions{}, errors.New("unsupported scheme " + p.Scheme)
  287. }
  288. // Remove the query string
  289. p.RawQuery = ""
  290. server = p.String()
  291. return
  292. }
  293. // queryBool returns the query parameter parsed as a boolean. An empty value
  294. // ("?foo") is considered true, as is any value string except false
  295. // ("?foo=false").
  296. func queryBool(q url.Values, key string) bool {
  297. if _, ok := q[key]; !ok {
  298. return false
  299. }
  300. return q.Get(key) != "false"
  301. }
  302. type idCheckingHTTPClient struct {
  303. httpClient
  304. id protocol.DeviceID
  305. }
  306. func newIDCheckingHTTPClient(client httpClient, id protocol.DeviceID) *idCheckingHTTPClient {
  307. return &idCheckingHTTPClient{
  308. httpClient: client,
  309. id: id,
  310. }
  311. }
  312. func (c *idCheckingHTTPClient) check(resp *http.Response) error {
  313. if resp.TLS == nil {
  314. return errors.New("security: not TLS")
  315. }
  316. if len(resp.TLS.PeerCertificates) == 0 {
  317. return errors.New("security: no certificates")
  318. }
  319. id := protocol.NewDeviceID(resp.TLS.PeerCertificates[0].Raw)
  320. if !id.Equals(c.id) {
  321. return errors.New("security: incorrect device id")
  322. }
  323. return nil
  324. }
  325. func (c *idCheckingHTTPClient) Get(ctx context.Context, url string) (*http.Response, error) {
  326. resp, err := c.httpClient.Get(ctx, url)
  327. if err != nil {
  328. return nil, err
  329. }
  330. if err := c.check(resp); err != nil {
  331. return nil, err
  332. }
  333. return resp, nil
  334. }
  335. func (c *idCheckingHTTPClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  336. resp, err := c.httpClient.Post(ctx, url, ctype, data)
  337. if err != nil {
  338. return nil, err
  339. }
  340. if err := c.check(resp); err != nil {
  341. return nil, err
  342. }
  343. return resp, nil
  344. }
  345. type errorHolder struct {
  346. err error
  347. mut stdsync.Mutex // uses stdlib sync as I want this to be trivially embeddable, and there is no risk of blocking
  348. }
  349. func (e *errorHolder) setError(err error) {
  350. e.mut.Lock()
  351. e.err = err
  352. e.mut.Unlock()
  353. }
  354. func (e *errorHolder) Error() error {
  355. e.mut.Lock()
  356. err := e.err
  357. e.mut.Unlock()
  358. return err
  359. }
  360. type contextClient struct {
  361. *http.Client
  362. }
  363. func (c *contextClient) Get(ctx context.Context, url string) (*http.Response, error) {
  364. // For <go1.13 compatibility. Use the following commented line once that
  365. // isn't required anymore.
  366. // req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
  367. req, err := http.NewRequest("GET", url, nil)
  368. if err != nil {
  369. return nil, err
  370. }
  371. req.Cancel = ctx.Done()
  372. return c.Client.Do(req)
  373. }
  374. func (c *contextClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  375. // For <go1.13 compatibility. Use the following commented line once that
  376. // isn't required anymore.
  377. // req, err := http.NewRequestWithContext(ctx, "POST", url, data)
  378. req, err := http.NewRequest("POST", url, data)
  379. if err != nil {
  380. return nil, err
  381. }
  382. req.Cancel = ctx.Done()
  383. req.Header.Set("Content-Type", ctype)
  384. return c.Client.Do(req)
  385. }
  386. func globalDiscoveryIdentity(addr string) string {
  387. return "global discovery server " + addr
  388. }
  389. func ipv4Identity(port int) string {
  390. return fmt.Sprintf("IPv4 local broadcast discovery on port %d", port)
  391. }
  392. func ipv6Identity(addr string) string {
  393. return fmt.Sprintf("IPv6 local multicast discovery on address %s", addr)
  394. }