service.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. // Copyright (C) 2015 The Syncthing Authors.
  2. //
  3. // This Source Code Form is subject to the terms of the Mozilla Public
  4. // License, v. 2.0. If a copy of the MPL was not distributed with this file,
  5. // You can obtain one at https://mozilla.org/MPL/2.0/.
  6. package nat
  7. import (
  8. "context"
  9. "fmt"
  10. "hash/fnv"
  11. "math/rand"
  12. "net"
  13. stdsync "sync"
  14. "time"
  15. "github.com/thejerf/suture"
  16. "github.com/syncthing/syncthing/lib/config"
  17. "github.com/syncthing/syncthing/lib/protocol"
  18. "github.com/syncthing/syncthing/lib/sync"
  19. "github.com/syncthing/syncthing/lib/util"
  20. )
  21. // Service runs a loop for discovery of IGDs (Internet Gateway Devices) and
  22. // setup/renewal of a port mapping.
  23. type Service struct {
  24. suture.Service
  25. id protocol.DeviceID
  26. cfg config.Wrapper
  27. processScheduled chan struct{}
  28. mappings []*Mapping
  29. enabled bool
  30. mut sync.RWMutex
  31. }
  32. func NewService(id protocol.DeviceID, cfg config.Wrapper) *Service {
  33. s := &Service{
  34. id: id,
  35. cfg: cfg,
  36. processScheduled: make(chan struct{}, 1),
  37. mut: sync.NewRWMutex(),
  38. }
  39. s.Service = util.AsService(s.serve, s.String())
  40. cfg.Subscribe(s)
  41. cfgCopy := cfg.RawCopy()
  42. s.CommitConfiguration(cfgCopy, cfgCopy)
  43. return s
  44. }
  45. func (s *Service) VerifyConfiguration(from, to config.Configuration) error {
  46. return nil
  47. }
  48. func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
  49. s.mut.Lock()
  50. if !s.enabled && to.Options.NATEnabled {
  51. l.Debugln("Starting NAT service")
  52. s.enabled = true
  53. s.scheduleProcess()
  54. } else if s.enabled && !to.Options.NATEnabled {
  55. l.Debugln("Stopping NAT service")
  56. s.enabled = false
  57. }
  58. s.mut.Unlock()
  59. return true
  60. }
  61. func (s *Service) Stop() {
  62. s.cfg.Unsubscribe(s)
  63. s.Service.Stop()
  64. }
  65. func (s *Service) serve(ctx context.Context) {
  66. announce := stdsync.Once{}
  67. timer := time.NewTimer(0)
  68. for {
  69. select {
  70. case <-timer.C:
  71. case <-s.processScheduled:
  72. if !timer.Stop() {
  73. select {
  74. case <-timer.C:
  75. default:
  76. }
  77. }
  78. case <-ctx.Done():
  79. timer.Stop()
  80. s.mut.RLock()
  81. for _, mapping := range s.mappings {
  82. mapping.clearAddresses()
  83. }
  84. s.mut.RUnlock()
  85. return
  86. }
  87. s.mut.RLock()
  88. enabled := s.enabled
  89. s.mut.RUnlock()
  90. if !enabled {
  91. continue
  92. }
  93. found, renewIn := s.process(ctx)
  94. timer.Reset(renewIn)
  95. if found != -1 {
  96. announce.Do(func() {
  97. suffix := "s"
  98. if found == 1 {
  99. suffix = ""
  100. }
  101. l.Infoln("Detected", found, "NAT service"+suffix)
  102. })
  103. }
  104. }
  105. }
  106. func (s *Service) process(ctx context.Context) (int, time.Duration) {
  107. // toRenew are mappings which are due for renewal
  108. // toUpdate are the remaining mappings, which will only be updated if one of
  109. // the old IGDs has gone away, or a new IGD has appeared, but only if we
  110. // actually need to perform a renewal.
  111. var toRenew, toUpdate []*Mapping
  112. renewIn := time.Duration(s.cfg.Options().NATRenewalM) * time.Minute
  113. if renewIn == 0 {
  114. // We always want to do renewal so lets just pick a nice sane number.
  115. renewIn = 30 * time.Minute
  116. }
  117. s.mut.RLock()
  118. for _, mapping := range s.mappings {
  119. if mapping.expires.Before(time.Now()) {
  120. toRenew = append(toRenew, mapping)
  121. } else {
  122. toUpdate = append(toUpdate, mapping)
  123. mappingRenewIn := time.Until(mapping.expires)
  124. if mappingRenewIn < renewIn {
  125. renewIn = mappingRenewIn
  126. }
  127. }
  128. }
  129. s.mut.RUnlock()
  130. // Don't do anything, unless we really need to renew
  131. if len(toRenew) == 0 {
  132. return -1, renewIn
  133. }
  134. nats := discoverAll(ctx, time.Duration(s.cfg.Options().NATRenewalM)*time.Minute, time.Duration(s.cfg.Options().NATTimeoutS)*time.Second)
  135. for _, mapping := range toRenew {
  136. s.updateMapping(ctx, mapping, nats, true)
  137. }
  138. for _, mapping := range toUpdate {
  139. s.updateMapping(ctx, mapping, nats, false)
  140. }
  141. return len(nats), renewIn
  142. }
  143. func (s *Service) scheduleProcess() {
  144. select {
  145. case s.processScheduled <- struct{}{}: // 1-buffered
  146. default:
  147. }
  148. }
  149. func (s *Service) NewMapping(protocol Protocol, ip net.IP, port int) *Mapping {
  150. mapping := &Mapping{
  151. protocol: protocol,
  152. address: Address{
  153. IP: ip,
  154. Port: port,
  155. },
  156. extAddresses: make(map[string]Address),
  157. mut: sync.NewRWMutex(),
  158. }
  159. s.mut.Lock()
  160. s.mappings = append(s.mappings, mapping)
  161. s.mut.Unlock()
  162. s.scheduleProcess()
  163. return mapping
  164. }
  165. // RemoveMapping does not actually remove the mapping from the IGD, it just
  166. // internally removes it which stops renewing the mapping. Also, it clears any
  167. // existing mapped addresses from the mapping, which as a result should cause
  168. // discovery to reannounce the new addresses.
  169. func (s *Service) RemoveMapping(mapping *Mapping) {
  170. s.mut.Lock()
  171. defer s.mut.Unlock()
  172. for i, existing := range s.mappings {
  173. if existing == mapping {
  174. mapping.clearAddresses()
  175. last := len(s.mappings) - 1
  176. s.mappings[i] = s.mappings[last]
  177. s.mappings[last] = nil
  178. s.mappings = s.mappings[:last]
  179. return
  180. }
  181. }
  182. }
  183. // updateMapping compares the addresses of the existing mapping versus the natds
  184. // discovered, and removes any addresses of natds that do not exist, or tries to
  185. // acquire mappings for natds which the mapping was unaware of before.
  186. // Optionally takes renew flag which indicates whether or not we should renew
  187. // mappings with existing natds
  188. func (s *Service) updateMapping(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) {
  189. var added, removed []Address
  190. renewalTime := time.Duration(s.cfg.Options().NATRenewalM) * time.Minute
  191. mapping.expires = time.Now().Add(renewalTime)
  192. newAdded, newRemoved := s.verifyExistingMappings(ctx, mapping, nats, renew)
  193. added = append(added, newAdded...)
  194. removed = append(removed, newRemoved...)
  195. newAdded, newRemoved = s.acquireNewMappings(ctx, mapping, nats)
  196. added = append(added, newAdded...)
  197. removed = append(removed, newRemoved...)
  198. if len(added) > 0 || len(removed) > 0 {
  199. mapping.notify(added, removed)
  200. }
  201. }
  202. func (s *Service) verifyExistingMappings(ctx context.Context, mapping *Mapping, nats map[string]Device, renew bool) ([]Address, []Address) {
  203. var added, removed []Address
  204. leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
  205. for id, address := range mapping.addressMap() {
  206. select {
  207. case <-ctx.Done():
  208. return nil, nil
  209. default:
  210. }
  211. // Delete addresses for NATDevice's that do not exist anymore
  212. nat, ok := nats[id]
  213. if !ok {
  214. mapping.removeAddress(id)
  215. removed = append(removed, address)
  216. continue
  217. } else if renew {
  218. // Only perform renewals on the nat's that have the right local IP
  219. // address
  220. localIP := nat.GetLocalIPAddress()
  221. if !mapping.validGateway(localIP) {
  222. l.Debugf("Skipping %s for %s because of IP mismatch. %s != %s", id, mapping, mapping.address.IP, localIP)
  223. continue
  224. }
  225. l.Debugf("Renewing %s -> %s mapping on %s", mapping, address, id)
  226. addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, address.Port, leaseTime)
  227. if err != nil {
  228. l.Debugf("Failed to renew %s -> mapping on %s", mapping, address, id)
  229. mapping.removeAddress(id)
  230. removed = append(removed, address)
  231. continue
  232. }
  233. l.Debugf("Renewed %s -> %s mapping on %s", mapping, address, id)
  234. if !addr.Equal(address) {
  235. mapping.removeAddress(id)
  236. mapping.setAddress(id, addr)
  237. removed = append(removed, address)
  238. added = append(added, address)
  239. }
  240. }
  241. }
  242. return added, removed
  243. }
  244. func (s *Service) acquireNewMappings(ctx context.Context, mapping *Mapping, nats map[string]Device) ([]Address, []Address) {
  245. var added, removed []Address
  246. leaseTime := time.Duration(s.cfg.Options().NATLeaseM) * time.Minute
  247. addrMap := mapping.addressMap()
  248. for id, nat := range nats {
  249. select {
  250. case <-ctx.Done():
  251. return nil, nil
  252. default:
  253. }
  254. if _, ok := addrMap[id]; ok {
  255. continue
  256. }
  257. // Only perform mappings on the nat's that have the right local IP
  258. // address
  259. localIP := nat.GetLocalIPAddress()
  260. if !mapping.validGateway(localIP) {
  261. l.Debugf("Skipping %s for %s because of IP mismatch. %s != %s", id, mapping, mapping.address.IP, localIP)
  262. continue
  263. }
  264. l.Debugf("Acquiring %s mapping on %s", mapping, id)
  265. addr, err := s.tryNATDevice(ctx, nat, mapping.address.Port, 0, leaseTime)
  266. if err != nil {
  267. l.Debugf("Failed to acquire %s mapping on %s", mapping, id)
  268. continue
  269. }
  270. l.Debugf("Acquired %s -> %s mapping on %s", mapping, addr, id)
  271. mapping.setAddress(id, addr)
  272. added = append(added, addr)
  273. }
  274. return added, removed
  275. }
  276. // tryNATDevice tries to acquire a port mapping for the given internal address to
  277. // the given external port. If external port is 0, picks a pseudo-random port.
  278. func (s *Service) tryNATDevice(ctx context.Context, natd Device, intPort, extPort int, leaseTime time.Duration) (Address, error) {
  279. var err error
  280. var port int
  281. // Generate a predictable random which is based on device ID + local port + hash of the device ID
  282. // number so that the ports we'd try to acquire for the mapping would always be the same for the
  283. // same device trying to get the same internal port.
  284. predictableRand := rand.New(rand.NewSource(int64(s.id.Short()) + int64(intPort) + hash(natd.ID())))
  285. if extPort != 0 {
  286. // First try renewing our existing mapping, if we have one.
  287. name := fmt.Sprintf("syncthing-%d", extPort)
  288. port, err = natd.AddPortMapping(ctx, TCP, intPort, extPort, name, leaseTime)
  289. if err == nil {
  290. extPort = port
  291. goto findIP
  292. }
  293. l.Debugln("Error extending lease on", natd.ID(), err)
  294. }
  295. for i := 0; i < 10; i++ {
  296. select {
  297. case <-ctx.Done():
  298. return Address{}, nil
  299. default:
  300. }
  301. // Then try up to ten random ports.
  302. extPort = 1024 + predictableRand.Intn(65535-1024)
  303. name := fmt.Sprintf("syncthing-%d", extPort)
  304. port, err = natd.AddPortMapping(ctx, TCP, intPort, extPort, name, leaseTime)
  305. if err == nil {
  306. extPort = port
  307. goto findIP
  308. }
  309. l.Debugln("Error getting new lease on", natd.ID(), err)
  310. }
  311. return Address{}, err
  312. findIP:
  313. ip, err := natd.GetExternalIPAddress(ctx)
  314. if err != nil {
  315. l.Debugln("Error getting external ip on", natd.ID(), err)
  316. ip = nil
  317. }
  318. return Address{
  319. IP: ip,
  320. Port: extPort,
  321. }, nil
  322. }
  323. func (s *Service) String() string {
  324. return fmt.Sprintf("nat.Service@%p", s)
  325. }
  326. func hash(input string) int64 {
  327. h := fnv.New64a()
  328. h.Write([]byte(input))
  329. return int64(h.Sum64())
  330. }