relay.go 5.1 KB


  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package relay
  7. import (
  8. "crypto/tls"
  9. "encoding/json"
  10. "net"
  11. "net/http"
  12. "net/url"
  13. "time"
  14. "github.com/syncthing/relaysrv/client"
  15. "github.com/syncthing/relaysrv/protocol"
  16. "github.com/syncthing/syncthing/lib/config"
  17. "github.com/syncthing/syncthing/lib/model"
  18. "github.com/syncthing/syncthing/lib/osutil"
  19. "github.com/syncthing/syncthing/lib/sync"
  20. "github.com/thejerf/suture"
  21. )
  22. func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.IntermediateConnection) *Svc {
  23. svc := &Svc{
  24. Supervisor: suture.New("Svc", suture.Spec{
  25. Log: func(log string) {
  26. if debug {
  27. l.Infoln(log)
  28. }
  29. },
  30. FailureBackoff: 5 * time.Minute,
  31. FailureDecay: float64((10 * time.Minute) / time.Second),
  32. FailureThreshold: 5,
  33. }),
  34. cfg: cfg,
  35. tlsCfg: tlsCfg,
  36. tokens: make(map[string]suture.ServiceToken),
  37. clients: make(map[string]*client.ProtocolClient),
  38. mut: sync.NewRWMutex(),
  39. invitations: make(chan protocol.SessionInvitation),
  40. }
  41. rcfg := cfg.Raw()
  42. svc.CommitConfiguration(rcfg, rcfg)
  43. cfg.Subscribe(svc)
  44. receiver := &invitationReceiver{
  45. tlsCfg: tlsCfg,
  46. conns: conns,
  47. invitations: svc.invitations,
  48. stop: make(chan struct{}),
  49. }
  50. svc.Add(receiver)
  51. return svc
  52. }
  53. type Svc struct {
  54. *suture.Supervisor
  55. cfg *config.Wrapper
  56. tlsCfg *tls.Config
  57. tokens map[string]suture.ServiceToken
  58. clients map[string]*client.ProtocolClient
  59. mut sync.RWMutex
  60. invitations chan protocol.SessionInvitation
  61. }
  62. func (s *Svc) VerifyConfiguration(from, to config.Configuration) error {
  63. for _, addr := range to.Options.RelayServers {
  64. _, err := url.Parse(addr)
  65. if err != nil {
  66. return err
  67. }
  68. }
  69. return nil
  70. }
  71. func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
  72. existing := make(map[string]*url.URL, len(to.Options.RelayServers))
  73. for _, addr := range to.Options.RelayServers {
  74. uri, err := url.Parse(addr)
  75. if err != nil {
  76. if debug {
  77. l.Debugln("Failed to parse relay address", addr, err)
  78. }
  79. continue
  80. }
  81. existing[uri.String()] = uri
  82. }
  83. // Expand dynamic addresses into a set of relays
  84. for key, uri := range existing {
  85. if uri.Scheme != "dynamic+http" && uri.Scheme != "dynamic+https" {
  86. continue
  87. }
  88. delete(existing, key)
  89. uri.Scheme = uri.Scheme[8:]
  90. data, err := http.Get(uri.String())
  91. if err != nil {
  92. if debug {
  93. l.Debugln("Failed to lookup dynamic relays", err)
  94. }
  95. continue
  96. }
  97. var ann dynamicAnnouncement
  98. err = json.NewDecoder(data.Body).Decode(&ann)
  99. data.Body.Close()
  100. if err != nil {
  101. if debug {
  102. l.Debugln("Failed to lookup dynamic relays", err)
  103. }
  104. continue
  105. }
  106. for _, relayAnn := range ann.Relays {
  107. ruri, err := url.Parse(relayAnn.URL)
  108. if err != nil {
  109. if debug {
  110. l.Debugln("Failed to parse dynamic relay address", relayAnn.URL, err)
  111. }
  112. continue
  113. }
  114. if debug {
  115. l.Debugln("Found", ruri, "via", uri)
  116. }
  117. existing[ruri.String()] = ruri
  118. }
  119. }
  120. s.mut.Lock()
  121. for key, uri := range existing {
  122. _, ok := s.tokens[key]
  123. if !ok {
  124. if debug {
  125. l.Debugln("Connecting to relay", uri)
  126. }
  127. c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations)
  128. s.tokens[key] = s.Add(c)
  129. s.clients[key] = c
  130. }
  131. }
  132. for key, token := range s.tokens {
  133. _, ok := existing[key]
  134. if !ok {
  135. err := s.Remove(token)
  136. delete(s.tokens, key)
  137. delete(s.clients, key)
  138. if debug {
  139. l.Debugln("Disconnecting from relay", key, err)
  140. }
  141. }
  142. }
  143. s.mut.Unlock()
  144. return true
  145. }
  146. func (s *Svc) ClientStatus() map[string]bool {
  147. s.mut.RLock()
  148. status := make(map[string]bool, len(s.clients))
  149. for uri, client := range s.clients {
  150. status[uri] = client.StatusOK()
  151. }
  152. s.mut.RUnlock()
  153. return status
  154. }
  155. type invitationReceiver struct {
  156. invitations chan protocol.SessionInvitation
  157. tlsCfg *tls.Config
  158. conns chan<- model.IntermediateConnection
  159. stop chan struct{}
  160. }
  161. func (r *invitationReceiver) Serve() {
  162. for {
  163. select {
  164. case inv := <-r.invitations:
  165. if debug {
  166. l.Debugln("Received relay invitation", inv)
  167. }
  168. conn, err := client.JoinSession(inv)
  169. if err != nil {
  170. if debug {
  171. l.Debugf("Failed to join relay session %s: %v", inv, err)
  172. }
  173. continue
  174. }
  175. err = osutil.SetTCPOptions(conn.(*net.TCPConn))
  176. if err != nil {
  177. l.Infoln(err)
  178. }
  179. var tc *tls.Conn
  180. if inv.ServerSocket {
  181. tc = tls.Server(conn, r.tlsCfg)
  182. } else {
  183. tc = tls.Client(conn, r.tlsCfg)
  184. }
  185. err = tc.Handshake()
  186. if err != nil {
  187. l.Infof("TLS handshake (BEP/relay %s): %v", inv, err)
  188. tc.Close()
  189. continue
  190. }
  191. r.conns <- model.IntermediateConnection{
  192. tc, model.ConnectionTypeRelayAccept,
  193. }
  194. case <-r.stop:
  195. return
  196. }
  197. }
  198. }
  199. func (r *invitationReceiver) Stop() {
  200. close(r.stop)
  201. }
  202. // This is the announcement recieved from the relay server;
  203. // {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
  204. type dynamicAnnouncement struct {
  205. Relays []struct {
  206. URL string
  207. }
  208. }