relay.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at http://mozilla.org/MPL/2.0/.
  6. package relay
  7. import (
  8. "crypto/tls"
  9. "net"
  10. "net/url"
  11. "time"
  12. "github.com/syncthing/relaysrv/client"
  13. "github.com/syncthing/relaysrv/protocol"
  14. "github.com/syncthing/syncthing/lib/config"
  15. "github.com/syncthing/syncthing/lib/model"
  16. "github.com/syncthing/syncthing/lib/osutil"
  17. "github.com/syncthing/syncthing/lib/sync"
  18. "github.com/thejerf/suture"
  19. )
  20. func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.IntermediateConnection) *Svc {
  21. svc := &Svc{
  22. Supervisor: suture.New("Svc", suture.Spec{
  23. Log: func(log string) {
  24. if debug {
  25. l.Infoln(log)
  26. }
  27. },
  28. FailureBackoff: 5 * time.Minute,
  29. FailureDecay: float64((10 * time.Minute) / time.Second),
  30. FailureThreshold: 5,
  31. }),
  32. cfg: cfg,
  33. tlsCfg: tlsCfg,
  34. tokens: make(map[string]suture.ServiceToken),
  35. clients: make(map[string]*client.ProtocolClient),
  36. mut: sync.NewRWMutex(),
  37. invitations: make(chan protocol.SessionInvitation),
  38. }
  39. rcfg := cfg.Raw()
  40. svc.CommitConfiguration(rcfg, rcfg)
  41. cfg.Subscribe(svc)
  42. receiver := &invitationReceiver{
  43. tlsCfg: tlsCfg,
  44. conns: conns,
  45. invitations: svc.invitations,
  46. }
  47. svc.receiverToken = svc.Add(receiver)
  48. return svc
  49. }
  50. type Svc struct {
  51. *suture.Supervisor
  52. cfg *config.Wrapper
  53. tlsCfg *tls.Config
  54. receiverToken suture.ServiceToken
  55. tokens map[string]suture.ServiceToken
  56. clients map[string]*client.ProtocolClient
  57. mut sync.RWMutex
  58. invitations chan protocol.SessionInvitation
  59. }
  60. func (s *Svc) VerifyConfiguration(from, to config.Configuration) error {
  61. for _, addr := range to.Options.RelayServers {
  62. _, err := url.Parse(addr)
  63. if err != nil {
  64. return err
  65. }
  66. }
  67. return nil
  68. }
  69. func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
  70. existing := make(map[string]struct{}, len(to.Options.RelayServers))
  71. for _, addr := range to.Options.RelayServers {
  72. uri, err := url.Parse(addr)
  73. if err != nil {
  74. if debug {
  75. l.Debugln("Failed to parse relay address", addr, err)
  76. }
  77. continue
  78. }
  79. existing[uri.String()] = struct{}{}
  80. _, ok := s.tokens[uri.String()]
  81. if !ok {
  82. if debug {
  83. l.Debugln("Connecting to relay", uri)
  84. }
  85. c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations)
  86. s.tokens[uri.String()] = s.Add(c)
  87. s.mut.Lock()
  88. s.clients[uri.String()] = c
  89. s.mut.Unlock()
  90. }
  91. }
  92. for uri, token := range s.tokens {
  93. _, ok := existing[uri]
  94. if !ok {
  95. err := s.Remove(token)
  96. delete(s.tokens, uri)
  97. s.mut.Lock()
  98. delete(s.clients, uri)
  99. s.mut.Unlock()
  100. if debug {
  101. l.Debugln("Disconnecting from relay", uri, err)
  102. }
  103. }
  104. }
  105. return true
  106. }
  107. func (s *Svc) ClientStatus() map[string]bool {
  108. s.mut.RLock()
  109. status := make(map[string]bool, len(s.clients))
  110. for uri, client := range s.clients {
  111. status[uri] = client.StatusOK()
  112. }
  113. s.mut.RUnlock()
  114. return status
  115. }
  116. type invitationReceiver struct {
  117. invitations chan protocol.SessionInvitation
  118. tlsCfg *tls.Config
  119. conns chan<- model.IntermediateConnection
  120. stop chan struct{}
  121. }
  122. func (r *invitationReceiver) Serve() {
  123. if r.stop != nil {
  124. return
  125. }
  126. r.stop = make(chan struct{})
  127. for {
  128. select {
  129. case inv := <-r.invitations:
  130. if debug {
  131. l.Debugln("Received relay invitation", inv)
  132. }
  133. conn, err := client.JoinSession(inv)
  134. if err != nil {
  135. if debug {
  136. l.Debugf("Failed to join relay session %s: %v", inv, err)
  137. }
  138. continue
  139. }
  140. err = osutil.SetTCPOptions(conn.(*net.TCPConn))
  141. if err != nil {
  142. l.Infoln(err)
  143. }
  144. var tc *tls.Conn
  145. if inv.ServerSocket {
  146. tc = tls.Server(conn, r.tlsCfg)
  147. } else {
  148. tc = tls.Client(conn, r.tlsCfg)
  149. }
  150. err = tc.Handshake()
  151. if err != nil {
  152. l.Infof("TLS handshake (BEP/relay %s): %v", inv, err)
  153. tc.Close()
  154. continue
  155. }
  156. r.conns <- model.IntermediateConnection{
  157. tc, model.ConnectionTypeRelayAccept,
  158. }
  159. case <-r.stop:
  160. return
  161. }
  162. }
  163. }
  164. func (r *invitationReceiver) Stop() {
  165. if r.stop == nil {
  166. return
  167. }
  168. r.stop <- struct{}{}
  169. r.stop = nil
  170. }