global.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package discover
  7. import (
  8. "bytes"
  9. "context"
  10. "crypto/tls"
  11. "encoding/json"
  12. "errors"
  13. "io"
  14. "io/ioutil"
  15. "net/http"
  16. "net/url"
  17. "strconv"
  18. stdsync "sync"
  19. "time"
  20. "github.com/thejerf/suture"
  21. "github.com/syncthing/syncthing/lib/dialer"
  22. "github.com/syncthing/syncthing/lib/events"
  23. "github.com/syncthing/syncthing/lib/protocol"
  24. "github.com/syncthing/syncthing/lib/util"
  25. )
  26. type globalClient struct {
  27. suture.Service
  28. server string
  29. addrList AddressLister
  30. announceClient httpClient
  31. queryClient httpClient
  32. noAnnounce bool
  33. noLookup bool
  34. evLogger events.Logger
  35. errorHolder
  36. }
  37. type httpClient interface {
  38. Get(ctx context.Context, url string) (*http.Response, error)
  39. Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error)
  40. }
  41. const (
  42. defaultReannounceInterval = 30 * time.Minute
  43. announceErrorRetryInterval = 5 * time.Minute
  44. requestTimeout = 5 * time.Second
  45. maxAddressChangesBetweenAnnouncements = 10
  46. )
  47. type announcement struct {
  48. Addresses []string `json:"addresses"`
  49. }
  50. type serverOptions struct {
  51. insecure bool // don't check certificate
  52. noAnnounce bool // don't announce
  53. noLookup bool // don't use for lookups
  54. id string // expected server device ID
  55. }
  56. // A lookupError is any other error but with a cache validity time attached.
  57. type lookupError struct {
  58. error
  59. cacheFor time.Duration
  60. }
  61. func (e lookupError) CacheFor() time.Duration {
  62. return e.cacheFor
  63. }
  64. func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLogger events.Logger) (FinderService, error) {
  65. server, opts, err := parseOptions(server)
  66. if err != nil {
  67. return nil, err
  68. }
  69. var devID protocol.DeviceID
  70. if opts.id != "" {
  71. devID, err = protocol.DeviceIDFromString(opts.id)
  72. if err != nil {
  73. return nil, err
  74. }
  75. }
  76. // The http.Client used for announcements. It needs to have our
  77. // certificate to prove our identity, and may or may not verify the server
  78. // certificate depending on the insecure setting.
  79. var announceClient httpClient = &contextClient{&http.Client{
  80. Timeout: requestTimeout,
  81. Transport: &http.Transport{
  82. DialContext: dialer.DialContext,
  83. Proxy: http.ProxyFromEnvironment,
  84. TLSClientConfig: &tls.Config{
  85. InsecureSkipVerify: opts.insecure,
  86. Certificates: []tls.Certificate{cert},
  87. },
  88. },
  89. }}
  90. if opts.id != "" {
  91. announceClient = newIDCheckingHTTPClient(announceClient, devID)
  92. }
  93. // The http.Client used for queries. We don't need to present our
  94. // certificate here, so lets not include it. May be insecure if requested.
  95. var queryClient httpClient = &contextClient{&http.Client{
  96. Timeout: requestTimeout,
  97. Transport: &http.Transport{
  98. DialContext: dialer.DialContext,
  99. Proxy: http.ProxyFromEnvironment,
  100. TLSClientConfig: &tls.Config{
  101. InsecureSkipVerify: opts.insecure,
  102. },
  103. },
  104. }}
  105. if opts.id != "" {
  106. queryClient = newIDCheckingHTTPClient(queryClient, devID)
  107. }
  108. cl := &globalClient{
  109. server: server,
  110. addrList: addrList,
  111. announceClient: announceClient,
  112. queryClient: queryClient,
  113. noAnnounce: opts.noAnnounce,
  114. noLookup: opts.noLookup,
  115. evLogger: evLogger,
  116. }
  117. cl.Service = util.AsService(cl.serve, cl.String())
  118. if !opts.noAnnounce {
  119. // If we are supposed to annonce, it's an error until we've done so.
  120. cl.setError(errors.New("not announced"))
  121. }
  122. return cl, nil
  123. }
  124. // Lookup returns the list of addresses where the given device is available
  125. func (c *globalClient) Lookup(ctx context.Context, device protocol.DeviceID) (addresses []string, err error) {
  126. if c.noLookup {
  127. return nil, lookupError{
  128. error: errors.New("lookups not supported"),
  129. cacheFor: time.Hour,
  130. }
  131. }
  132. qURL, err := url.Parse(c.server)
  133. if err != nil {
  134. return nil, err
  135. }
  136. q := qURL.Query()
  137. q.Set("device", device.String())
  138. qURL.RawQuery = q.Encode()
  139. resp, err := c.queryClient.Get(ctx, qURL.String())
  140. if err != nil {
  141. l.Debugln("globalClient.Lookup", qURL, err)
  142. return nil, err
  143. }
  144. if resp.StatusCode != 200 {
  145. resp.Body.Close()
  146. l.Debugln("globalClient.Lookup", qURL, resp.Status)
  147. err := errors.New(resp.Status)
  148. if secs, atoiErr := strconv.Atoi(resp.Header.Get("Retry-After")); atoiErr == nil && secs > 0 {
  149. err = lookupError{
  150. error: err,
  151. cacheFor: time.Duration(secs) * time.Second,
  152. }
  153. }
  154. return nil, err
  155. }
  156. bs, err := ioutil.ReadAll(resp.Body)
  157. if err != nil {
  158. return nil, err
  159. }
  160. resp.Body.Close()
  161. var ann announcement
  162. err = json.Unmarshal(bs, &ann)
  163. return ann.Addresses, err
  164. }
  165. func (c *globalClient) String() string {
  166. return "global@" + c.server
  167. }
  168. func (c *globalClient) serve(ctx context.Context) {
  169. if c.noAnnounce {
  170. // We're configured to not do announcements, only lookups. To maintain
  171. // the same interface, we just pause here if Serve() is run.
  172. <-ctx.Done()
  173. return
  174. }
  175. timer := time.NewTimer(5 * time.Second)
  176. defer timer.Stop()
  177. eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged)
  178. defer eventSub.Unsubscribe()
  179. timerResetCount := 0
  180. for {
  181. select {
  182. case <-eventSub.C():
  183. if timerResetCount < maxAddressChangesBetweenAnnouncements {
  184. // Defer announcement by 2 seconds, essentially debouncing
  185. // if we have a stream of events incoming in quick succession.
  186. timer.Reset(2 * time.Second)
  187. } else if timerResetCount == maxAddressChangesBetweenAnnouncements {
  188. // Yet only do it if we haven't had to reset maxAddressChangesBetweenAnnouncements times in a row,
  189. // so if something is flip-flopping within 2 seconds, we don't end up in a permanent reset loop.
  190. l.Warnf("Detected a flip-flopping listener")
  191. c.setError(errors.New("flip flopping listener"))
  192. // Incrementing the count above 10 will prevent us from warning or setting the error again
  193. // It will also suppress event based resets until we've had a proper round after announceErrorRetryInterval
  194. timer.Reset(announceErrorRetryInterval)
  195. }
  196. timerResetCount++
  197. case <-timer.C:
  198. timerResetCount = 0
  199. c.sendAnnouncement(ctx, timer)
  200. case <-ctx.Done():
  201. return
  202. }
  203. }
  204. }
  205. func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer) {
  206. var ann announcement
  207. if c.addrList != nil {
  208. ann.Addresses = c.addrList.ExternalAddresses()
  209. }
  210. if len(ann.Addresses) == 0 {
  211. // There are legitimate cases for not having anything to announce,
  212. // yet still using global discovery for lookups. Do not error out
  213. // here.
  214. c.setError(nil)
  215. timer.Reset(announceErrorRetryInterval)
  216. return
  217. }
  218. // The marshal doesn't fail, I promise.
  219. postData, _ := json.Marshal(ann)
  220. l.Debugf("Announcement: %v", ann)
  221. resp, err := c.announceClient.Post(ctx, c.server, "application/json", bytes.NewReader(postData))
  222. if err != nil {
  223. l.Debugln("announce POST:", err)
  224. c.setError(err)
  225. timer.Reset(announceErrorRetryInterval)
  226. return
  227. }
  228. l.Debugln("announce POST:", resp.Status)
  229. resp.Body.Close()
  230. if resp.StatusCode < 200 || resp.StatusCode > 299 {
  231. l.Debugln("announce POST:", resp.Status)
  232. c.setError(errors.New(resp.Status))
  233. if h := resp.Header.Get("Retry-After"); h != "" {
  234. // The server has a recommendation on when we should
  235. // retry. Follow it.
  236. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  237. l.Debugln("announce Retry-After:", secs, err)
  238. timer.Reset(time.Duration(secs) * time.Second)
  239. return
  240. }
  241. }
  242. timer.Reset(announceErrorRetryInterval)
  243. return
  244. }
  245. c.setError(nil)
  246. if h := resp.Header.Get("Reannounce-After"); h != "" {
  247. // The server has a recommendation on when we should
  248. // reannounce. Follow it.
  249. if secs, err := strconv.Atoi(h); err == nil && secs > 0 {
  250. l.Debugln("announce Reannounce-After:", secs, err)
  251. timer.Reset(time.Duration(secs) * time.Second)
  252. return
  253. }
  254. }
  255. timer.Reset(defaultReannounceInterval)
  256. }
  257. func (c *globalClient) Cache() map[protocol.DeviceID]CacheEntry {
  258. // The globalClient doesn't do caching
  259. return nil
  260. }
  261. // parseOptions parses and strips away any ?query=val options, setting the
  262. // corresponding field in the serverOptions struct. Unknown query options are
  263. // ignored and removed.
  264. func parseOptions(dsn string) (server string, opts serverOptions, err error) {
  265. p, err := url.Parse(dsn)
  266. if err != nil {
  267. return "", serverOptions{}, err
  268. }
  269. // Grab known options from the query string
  270. q := p.Query()
  271. opts.id = q.Get("id")
  272. opts.insecure = opts.id != "" || queryBool(q, "insecure")
  273. opts.noAnnounce = queryBool(q, "noannounce")
  274. opts.noLookup = queryBool(q, "nolookup")
  275. // Check for disallowed combinations
  276. if p.Scheme == "http" {
  277. if !opts.insecure {
  278. return "", serverOptions{}, errors.New("http without insecure not supported")
  279. }
  280. if !opts.noAnnounce {
  281. return "", serverOptions{}, errors.New("http without noannounce not supported")
  282. }
  283. } else if p.Scheme != "https" {
  284. return "", serverOptions{}, errors.New("unsupported scheme " + p.Scheme)
  285. }
  286. // Remove the query string
  287. p.RawQuery = ""
  288. server = p.String()
  289. return
  290. }
  291. // queryBool returns the query parameter parsed as a boolean. An empty value
  292. // ("?foo") is considered true, as is any value string except false
  293. // ("?foo=false").
  294. func queryBool(q url.Values, key string) bool {
  295. if _, ok := q[key]; !ok {
  296. return false
  297. }
  298. return q.Get(key) != "false"
  299. }
  300. type idCheckingHTTPClient struct {
  301. httpClient
  302. id protocol.DeviceID
  303. }
  304. func newIDCheckingHTTPClient(client httpClient, id protocol.DeviceID) *idCheckingHTTPClient {
  305. return &idCheckingHTTPClient{
  306. httpClient: client,
  307. id: id,
  308. }
  309. }
  310. func (c *idCheckingHTTPClient) check(resp *http.Response) error {
  311. if resp.TLS == nil {
  312. return errors.New("security: not TLS")
  313. }
  314. if len(resp.TLS.PeerCertificates) == 0 {
  315. return errors.New("security: no certificates")
  316. }
  317. id := protocol.NewDeviceID(resp.TLS.PeerCertificates[0].Raw)
  318. if !id.Equals(c.id) {
  319. return errors.New("security: incorrect device id")
  320. }
  321. return nil
  322. }
  323. func (c *idCheckingHTTPClient) Get(ctx context.Context, url string) (*http.Response, error) {
  324. resp, err := c.httpClient.Get(ctx, url)
  325. if err != nil {
  326. return nil, err
  327. }
  328. if err := c.check(resp); err != nil {
  329. return nil, err
  330. }
  331. return resp, nil
  332. }
  333. func (c *idCheckingHTTPClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  334. resp, err := c.httpClient.Post(ctx, url, ctype, data)
  335. if err != nil {
  336. return nil, err
  337. }
  338. if err := c.check(resp); err != nil {
  339. return nil, err
  340. }
  341. return resp, nil
  342. }
  343. type errorHolder struct {
  344. err error
  345. mut stdsync.Mutex // uses stdlib sync as I want this to be trivially embeddable, and there is no risk of blocking
  346. }
  347. func (e *errorHolder) setError(err error) {
  348. e.mut.Lock()
  349. e.err = err
  350. e.mut.Unlock()
  351. }
  352. func (e *errorHolder) Error() error {
  353. e.mut.Lock()
  354. err := e.err
  355. e.mut.Unlock()
  356. return err
  357. }
  358. type contextClient struct {
  359. *http.Client
  360. }
  361. func (c *contextClient) Get(ctx context.Context, url string) (*http.Response, error) {
  362. // For <go1.13 compatibility. Use the following commented line once that
  363. // isn't required anymore.
  364. // req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
  365. req, err := http.NewRequest("GET", url, nil)
  366. if err != nil {
  367. return nil, err
  368. }
  369. req.Cancel = ctx.Done()
  370. return c.Client.Do(req)
  371. }
  372. func (c *contextClient) Post(ctx context.Context, url, ctype string, data io.Reader) (*http.Response, error) {
  373. // For <go1.13 compatibility. Use the following commented line once that
  374. // isn't required anymore.
  375. // req, err := http.NewRequestWithContext(ctx, "POST", url, data)
  376. req, err := http.NewRequest("POST", url, data)
  377. if err != nil {
  378. return nil, err
  379. }
  380. req.Cancel = ctx.Done()
  381. req.Header.Set("Content-Type", ctype)
  382. return c.Client.Do(req)
  383. }