mesh_client.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package derphttp
  4. import (
  5. "context"
  6. "sync"
  7. "time"
  8. "tailscale.com/derp"
  9. "tailscale.com/types/key"
  10. "tailscale.com/types/logger"
  11. )
  12. var retryInterval = 5 * time.Second
  13. // testHookWatchLookConnectResult, if non-nil for tests, is called by RunWatchConnectionLoop
  14. // with the connect result. If it returns false, the loop ends.
  15. var testHookWatchLookConnectResult func(connectError error, wasSelfConnect bool) (keepRunning bool)
  16. // RunWatchConnectionLoop loops until ctx is done, sending
  17. // WatchConnectionChanges and subscribing to connection changes.
  18. //
  19. // If the server's public key is ignoreServerKey, RunWatchConnectionLoop
  20. // returns.
  21. //
  22. // Otherwise, the add and remove funcs are called as clients come & go.
  23. // Note that add is called for every new connection and remove is only
  24. // called for the final disconnection. See https://github.com/tailscale/tailscale/issues/13566.
  25. // This behavior will likely change. Callers should do their own accounting
  26. // and dup suppression as needed.
  27. //
  28. // If set the notifyError func is called with any error that occurs within the ctx
  29. // main loop connection setup, or the inner loop receiving messages via RecvDetail.
  30. //
  31. // infoLogf, if non-nil, is the logger to write periodic status updates about
  32. // how many peers are on the server. Error log output is set to the c's logger,
  33. // regardless of infoLogf's value.
  34. //
  35. // To force RunWatchConnectionLoop to return quickly, its ctx needs to be
  36. // closed, and c itself needs to be closed.
  37. //
  38. // It is a fatal error to call this on an already-started Client without having
  39. // initialized Client.WatchConnectionChanges to true.
  40. //
  41. // If the DERP connection breaks and reconnects, remove will be called for all
  42. // previously seen peers, with Reason type PeerGoneReasonMeshConnBroke. Those
  43. // clients are likely still connected and their add message will appear after
  44. // reconnect.
  45. func (c *Client) RunWatchConnectionLoop(ctx context.Context, ignoreServerKey key.NodePublic, infoLogf logger.Logf,
  46. add func(derp.PeerPresentMessage), remove func(derp.PeerGoneMessage), notifyError func(error)) {
  47. if !c.WatchConnectionChanges {
  48. if c.isStarted() {
  49. panic("invalid use of RunWatchConnectionLoop on already-started Client without setting Client.RunWatchConnectionLoop")
  50. }
  51. c.WatchConnectionChanges = true
  52. }
  53. if infoLogf == nil {
  54. infoLogf = logger.Discard
  55. }
  56. logf := c.logf
  57. const statusInterval = 10 * time.Second
  58. var (
  59. mu sync.Mutex
  60. present = map[key.NodePublic]bool{}
  61. loggedConnected = false
  62. )
  63. clear := func() {
  64. mu.Lock()
  65. defer mu.Unlock()
  66. if len(present) == 0 {
  67. return
  68. }
  69. logf("reconnected; clearing %d forwarding mappings", len(present))
  70. for k := range present {
  71. remove(derp.PeerGoneMessage{Peer: k, Reason: derp.PeerGoneReasonMeshConnBroke})
  72. }
  73. present = map[key.NodePublic]bool{}
  74. }
  75. lastConnGen := 0
  76. lastStatus := c.clock.Now()
  77. logConnectedLocked := func() {
  78. if loggedConnected {
  79. return
  80. }
  81. infoLogf("connected; %d peers", len(present))
  82. loggedConnected = true
  83. }
  84. const logConnectedDelay = 200 * time.Millisecond
  85. timer := c.clock.AfterFunc(2*time.Second, func() {
  86. mu.Lock()
  87. defer mu.Unlock()
  88. logConnectedLocked()
  89. })
  90. defer timer.Stop()
  91. updatePeer := func(k key.NodePublic, isPresent bool) {
  92. mu.Lock()
  93. defer mu.Unlock()
  94. if isPresent {
  95. present[k] = true
  96. if !loggedConnected {
  97. timer.Reset(logConnectedDelay)
  98. }
  99. } else {
  100. // If we got a peerGone message, that means the initial connection's
  101. // flood of peerPresent messages is done, so we can log already:
  102. logConnectedLocked()
  103. delete(present, k)
  104. }
  105. }
  106. sleep := func(d time.Duration) {
  107. t, tChannel := c.clock.NewTimer(d)
  108. select {
  109. case <-ctx.Done():
  110. t.Stop()
  111. case <-tChannel:
  112. }
  113. }
  114. for ctx.Err() == nil {
  115. // Make sure we're connected before calling s.ServerPublicKey.
  116. _, _, err := c.connect(ctx, "RunWatchConnectionLoop")
  117. if err != nil {
  118. logf("mesh connect: %v", err)
  119. if notifyError != nil {
  120. notifyError(err)
  121. }
  122. if f := testHookWatchLookConnectResult; f != nil && !f(err, false) {
  123. return
  124. }
  125. logf("mesh connect: %v", err)
  126. sleep(retryInterval)
  127. continue
  128. }
  129. selfConnect := c.ServerPublicKey() == ignoreServerKey
  130. if f := testHookWatchLookConnectResult; f != nil && !f(err, selfConnect) {
  131. return
  132. }
  133. if selfConnect {
  134. logf("detected self-connect; ignoring host")
  135. return
  136. }
  137. for {
  138. m, connGen, err := c.RecvDetail()
  139. if err != nil {
  140. clear()
  141. logf("Recv: %v", err)
  142. if notifyError != nil {
  143. notifyError(err)
  144. }
  145. sleep(retryInterval)
  146. break
  147. }
  148. if connGen != lastConnGen {
  149. lastConnGen = connGen
  150. clear()
  151. }
  152. switch m := m.(type) {
  153. case derp.PeerPresentMessage:
  154. add(m)
  155. updatePeer(m.Key, true)
  156. case derp.PeerGoneMessage:
  157. switch m.Reason {
  158. case derp.PeerGoneReasonDisconnected:
  159. // Normal case, log nothing
  160. case derp.PeerGoneReasonNotHere:
  161. logf("Recv: peer %s not connected to %s",
  162. key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString())
  163. default:
  164. logf("Recv: peer %s not at server %s for unknown reason %v",
  165. key.NodePublic(m.Peer).ShortString(), c.ServerPublicKey().ShortString(), m.Reason)
  166. }
  167. remove(m)
  168. updatePeer(m.Peer, false)
  169. default:
  170. continue
  171. }
  172. if now := c.clock.Now(); now.Sub(lastStatus) > statusInterval {
  173. lastStatus = now
  174. infoLogf("%d peers", len(present))
  175. }
  176. }
  177. }
  178. }