auto.go 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. package controlclient
  4. import (
  5. "context"
  6. "errors"
  7. "fmt"
  8. "net/http"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. "tailscale.com/health"
  13. "tailscale.com/logtail/backoff"
  14. "tailscale.com/net/sockstats"
  15. "tailscale.com/tailcfg"
  16. "tailscale.com/tstime"
  17. "tailscale.com/types/key"
  18. "tailscale.com/types/logger"
  19. "tailscale.com/types/netmap"
  20. "tailscale.com/types/persist"
  21. "tailscale.com/types/structs"
  22. )
  23. type LoginGoal struct {
  24. _ structs.Incomparable
  25. token *tailcfg.Oauth2Token // oauth token to use when logging in
  26. flags LoginFlags // flags to use when logging in
  27. url string // auth url that needs to be visited
  28. }
  29. var _ Client = (*Auto)(nil)
  30. // waitUnpause waits until either the client is unpaused or the Auto client is
  31. // shut down. It reports whether the client should keep running (i.e. it's not
  32. // closed).
  33. func (c *Auto) waitUnpause(routineLogName string) (keepRunning bool) {
  34. c.mu.Lock()
  35. if !c.paused || c.closed {
  36. defer c.mu.Unlock()
  37. return !c.closed
  38. }
  39. unpaused := c.unpausedChanLocked()
  40. c.mu.Unlock()
  41. c.logf("%s: awaiting unpause", routineLogName)
  42. return <-unpaused
  43. }
  44. // updateRoutine is responsible for informing the server of worthy changes to
  45. // our local state. It runs in its own goroutine.
  46. func (c *Auto) updateRoutine() {
  47. defer close(c.updateDone)
  48. bo := backoff.NewBackoff("updateRoutine", c.logf, 30*time.Second)
  49. // lastUpdateGenInformed is the value of lastUpdateAt that we've successfully
  50. // informed the server of.
  51. var lastUpdateGenInformed updateGen
  52. for {
  53. if !c.waitUnpause("updateRoutine") {
  54. c.logf("updateRoutine: exiting")
  55. return
  56. }
  57. c.mu.Lock()
  58. gen := c.lastUpdateGen
  59. ctx := c.mapCtx
  60. needUpdate := gen > 0 && gen != lastUpdateGenInformed && c.loggedIn
  61. c.mu.Unlock()
  62. if !needUpdate {
  63. // Nothing to do, wait for a signal.
  64. select {
  65. case <-ctx.Done():
  66. continue
  67. case <-c.updateCh:
  68. continue
  69. }
  70. }
  71. t0 := c.clock.Now()
  72. err := c.direct.SendUpdate(ctx)
  73. d := time.Since(t0).Round(time.Millisecond)
  74. if err != nil {
  75. if ctx.Err() == nil {
  76. c.direct.logf("lite map update error after %v: %v", d, err)
  77. }
  78. bo.BackOff(ctx, err)
  79. continue
  80. }
  81. bo.BackOff(ctx, nil)
  82. c.direct.logf("[v1] successful lite map update in %v", d)
  83. lastUpdateGenInformed = gen
  84. }
  85. }
  86. // atomicGen is an atomic int64 generator. It is used to generate monotonically
  87. // increasing numbers for updateGen.
  88. var atomicGen atomic.Int64
  89. func nextUpdateGen() updateGen {
  90. return updateGen(atomicGen.Add(1))
  91. }
  92. // updateGen is a monotonically increasing number that represents a particular
  93. // update to the local state.
  94. type updateGen int64
  95. // Auto connects to a tailcontrol server for a node.
  96. // It's a concrete implementation of the Client interface.
  97. type Auto struct {
  98. direct *Direct // our interface to the server APIs
  99. clock tstime.Clock
  100. logf logger.Logf
  101. closed bool
  102. updateCh chan struct{} // readable when we should inform the server of a change
  103. observer Observer // called to update Client status; always non-nil
  104. observerQueue execQueue
  105. unregisterHealthWatch func()
  106. mu sync.Mutex // mutex guards the following fields
  107. wantLoggedIn bool // whether the user wants to be logged in per last method call
  108. urlToVisit string // the last url we were told to visit
  109. expiry time.Time
  110. // lastUpdateGen is the gen of last update we had an update worth sending to
  111. // the server.
  112. lastUpdateGen updateGen
  113. paused bool // whether we should stop making HTTP requests
  114. unpauseWaiters []chan bool // chans that gets sent true (once) on wake, or false on Shutdown
  115. loggedIn bool // true if currently logged in
  116. loginGoal *LoginGoal // non-nil if some login activity is desired
  117. inMapPoll bool // true once we get the first MapResponse in a stream; false when HTTP response ends
  118. state State // TODO(bradfitz): delete this, make it computed by method from other state
  119. authCtx context.Context // context used for auth requests
  120. mapCtx context.Context // context used for netmap and update requests
  121. authCancel func() // cancel authCtx
  122. mapCancel func() // cancel mapCtx
  123. authDone chan struct{} // when closed, authRoutine is done
  124. mapDone chan struct{} // when closed, mapRoutine is done
  125. updateDone chan struct{} // when closed, updateRoutine is done
  126. }
  127. // New creates and starts a new Auto.
  128. func New(opts Options) (*Auto, error) {
  129. c, err := NewNoStart(opts)
  130. if c != nil {
  131. c.Start()
  132. }
  133. return c, err
  134. }
  135. // NewNoStart creates a new Auto, but without calling Start on it.
  136. func NewNoStart(opts Options) (_ *Auto, err error) {
  137. direct, err := NewDirect(opts)
  138. if err != nil {
  139. return nil, err
  140. }
  141. defer func() {
  142. if err != nil {
  143. direct.Close()
  144. }
  145. }()
  146. if opts.Observer == nil {
  147. return nil, errors.New("missing required Options.Observer")
  148. }
  149. if opts.Logf == nil {
  150. opts.Logf = func(fmt string, args ...any) {}
  151. }
  152. if opts.Clock == nil {
  153. opts.Clock = tstime.StdClock{}
  154. }
  155. c := &Auto{
  156. direct: direct,
  157. clock: opts.Clock,
  158. logf: opts.Logf,
  159. updateCh: make(chan struct{}, 1),
  160. authDone: make(chan struct{}),
  161. mapDone: make(chan struct{}),
  162. updateDone: make(chan struct{}),
  163. observer: opts.Observer,
  164. }
  165. c.authCtx, c.authCancel = context.WithCancel(context.Background())
  166. c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, opts.Logf)
  167. c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
  168. c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, opts.Logf)
  169. c.unregisterHealthWatch = health.RegisterWatcher(direct.ReportHealthChange)
  170. return c, nil
  171. }
  172. // SetPaused controls whether HTTP activity should be paused.
  173. //
  174. // The client can be paused and unpaused repeatedly, unlike Start and Shutdown, which can only be used once.
  175. func (c *Auto) SetPaused(paused bool) {
  176. c.mu.Lock()
  177. defer c.mu.Unlock()
  178. if paused == c.paused || c.closed {
  179. return
  180. }
  181. c.logf("setPaused(%v)", paused)
  182. c.paused = paused
  183. if paused {
  184. c.cancelMapCtxLocked()
  185. c.cancelAuthCtxLocked()
  186. return
  187. }
  188. for _, ch := range c.unpauseWaiters {
  189. ch <- true
  190. }
  191. c.unpauseWaiters = nil
  192. }
  193. // Start starts the client's goroutines.
  194. //
  195. // It should only be called for clients created by NewNoStart.
  196. func (c *Auto) Start() {
  197. go c.authRoutine()
  198. go c.mapRoutine()
  199. go c.updateRoutine()
  200. }
  201. // updateControl sends a new OmitPeers, non-streaming map request (to just send
  202. // Hostinfo/Netinfo/Endpoints info, while keeping an existing streaming response
  203. // open).
  204. //
  205. // It should be called whenever there's something new to tell the server.
  206. func (c *Auto) updateControl() {
  207. gen := nextUpdateGen()
  208. c.mu.Lock()
  209. if gen < c.lastUpdateGen {
  210. // This update is out of date.
  211. c.mu.Unlock()
  212. return
  213. }
  214. c.lastUpdateGen = gen
  215. c.mu.Unlock()
  216. select {
  217. case c.updateCh <- struct{}{}:
  218. default:
  219. }
  220. }
  221. // cancelAuthCtx cancels the existing auth goroutine's context
  222. // & creates a new one, causing it to restart.
  223. func (c *Auto) cancelAuthCtx() {
  224. c.mu.Lock()
  225. defer c.mu.Unlock()
  226. c.cancelAuthCtxLocked()
  227. }
  228. // cancelAuthCtxLocked is like cancelAuthCtx, but assumes the caller holds c.mu.
  229. func (c *Auto) cancelAuthCtxLocked() {
  230. if c.authCancel != nil {
  231. c.authCancel()
  232. }
  233. if !c.closed {
  234. c.authCtx, c.authCancel = context.WithCancel(context.Background())
  235. c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto, c.logf)
  236. }
  237. }
  238. // cancelMapCtx cancels the context for the existing mapPoll and liteUpdates
  239. // goroutines and creates a new one, causing them to restart.
  240. func (c *Auto) cancelMapCtx() {
  241. c.mu.Lock()
  242. defer c.mu.Unlock()
  243. c.cancelMapCtxLocked()
  244. }
  245. // cancelMapCtxLocked is like cancelMapCtx, but assumes the caller holds c.mu.
  246. func (c *Auto) cancelMapCtxLocked() {
  247. if c.mapCancel != nil {
  248. c.mapCancel()
  249. }
  250. if !c.closed {
  251. c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
  252. c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto, c.logf)
  253. }
  254. }
  255. // restartMap cancels the existing mapPoll and liteUpdates, and then starts a
  256. // new one.
  257. func (c *Auto) restartMap() {
  258. c.mu.Lock()
  259. c.cancelMapCtxLocked()
  260. synced := c.inMapPoll
  261. c.mu.Unlock()
  262. c.logf("[v1] restartMap: synced=%v", synced)
  263. c.updateControl()
  264. }
  265. func (c *Auto) authRoutine() {
  266. defer close(c.authDone)
  267. bo := backoff.NewBackoff("authRoutine", c.logf, 30*time.Second)
  268. for {
  269. if !c.waitUnpause("authRoutine") {
  270. c.logf("authRoutine: exiting")
  271. return
  272. }
  273. c.mu.Lock()
  274. goal := c.loginGoal
  275. ctx := c.authCtx
  276. if goal != nil {
  277. c.logf("[v1] authRoutine: %s; wantLoggedIn=%v", c.state, true)
  278. } else {
  279. c.logf("[v1] authRoutine: %s; goal=nil paused=%v", c.state, c.paused)
  280. }
  281. c.mu.Unlock()
  282. report := func(err error, msg string) {
  283. c.logf("[v1] %s: %v", msg, err)
  284. // don't send status updates for context errors,
  285. // since context cancelation is always on purpose.
  286. if ctx.Err() == nil {
  287. c.sendStatus("authRoutine-report", err, "", nil)
  288. }
  289. }
  290. if goal == nil {
  291. health.SetAuthRoutineInError(nil)
  292. // Wait for user to Login or Logout.
  293. <-ctx.Done()
  294. c.logf("[v1] authRoutine: context done.")
  295. continue
  296. }
  297. c.mu.Lock()
  298. c.urlToVisit = goal.url
  299. if goal.url != "" {
  300. c.state = StateURLVisitRequired
  301. } else {
  302. c.state = StateAuthenticating
  303. }
  304. c.mu.Unlock()
  305. var url string
  306. var err error
  307. var f string
  308. if goal.url != "" {
  309. url, err = c.direct.WaitLoginURL(ctx, goal.url)
  310. f = "WaitLoginURL"
  311. } else {
  312. url, err = c.direct.TryLogin(ctx, goal.token, goal.flags)
  313. f = "TryLogin"
  314. }
  315. if err != nil {
  316. health.SetAuthRoutineInError(err)
  317. report(err, f)
  318. bo.BackOff(ctx, err)
  319. continue
  320. }
  321. if url != "" {
  322. // goal.url ought to be empty here.
  323. // However, not all control servers get this right,
  324. // and logging about it here just generates noise.
  325. c.mu.Lock()
  326. c.urlToVisit = url
  327. c.loginGoal = &LoginGoal{
  328. flags: LoginDefault,
  329. url: url,
  330. }
  331. c.state = StateURLVisitRequired
  332. c.mu.Unlock()
  333. c.sendStatus("authRoutine-url", err, url, nil)
  334. if goal.url == url {
  335. // The server sent us the same URL we already tried,
  336. // backoff to avoid a busy loop.
  337. bo.BackOff(ctx, errors.New("login URL not changing"))
  338. } else {
  339. bo.BackOff(ctx, nil)
  340. }
  341. continue
  342. }
  343. // success
  344. health.SetAuthRoutineInError(nil)
  345. c.mu.Lock()
  346. c.urlToVisit = ""
  347. c.loggedIn = true
  348. c.loginGoal = nil
  349. c.state = StateAuthenticated
  350. c.mu.Unlock()
  351. c.sendStatus("authRoutine-success", nil, "", nil)
  352. c.restartMap()
  353. bo.BackOff(ctx, nil)
  354. }
  355. }
  356. // ExpiryForTests returns the credential expiration time, or the zero value if
  357. // the expiration time isn't known. It's used in tests only.
  358. func (c *Auto) ExpiryForTests() time.Time {
  359. c.mu.Lock()
  360. defer c.mu.Unlock()
  361. return c.expiry
  362. }
  363. // DirectForTest returns the underlying direct client object.
  364. // It's used in tests only.
  365. func (c *Auto) DirectForTest() *Direct {
  366. return c.direct
  367. }
  368. // unpausedChanLocked returns a new channel that gets sent
  369. // either a true when unpaused or false on Auto.Shutdown.
  370. //
  371. // c.mu must be held
  372. func (c *Auto) unpausedChanLocked() <-chan bool {
  373. unpaused := make(chan bool, 1)
  374. c.unpauseWaiters = append(c.unpauseWaiters, unpaused)
  375. return unpaused
  376. }
  377. // mapRoutineState is the state of Auto.mapRoutine while it's running.
  378. type mapRoutineState struct {
  379. c *Auto
  380. bo *backoff.Backoff
  381. }
  382. var _ NetmapDeltaUpdater = mapRoutineState{}
  383. func (mrs mapRoutineState) UpdateFullNetmap(nm *netmap.NetworkMap) {
  384. c := mrs.c
  385. c.mu.Lock()
  386. ctx := c.mapCtx
  387. c.inMapPoll = true
  388. if c.loggedIn {
  389. c.state = StateSynchronized
  390. }
  391. c.expiry = nm.Expiry
  392. stillAuthed := c.loggedIn
  393. c.logf("[v1] mapRoutine: netmap received: %s", c.state)
  394. c.mu.Unlock()
  395. if stillAuthed {
  396. c.sendStatus("mapRoutine-got-netmap", nil, "", nm)
  397. }
  398. // Reset the backoff timer if we got a netmap.
  399. mrs.bo.BackOff(ctx, nil)
  400. }
  401. func (mrs mapRoutineState) UpdateNetmapDelta(muts []netmap.NodeMutation) bool {
  402. c := mrs.c
  403. c.mu.Lock()
  404. goodState := c.loggedIn && c.inMapPoll
  405. ndu, canDelta := c.observer.(NetmapDeltaUpdater)
  406. c.mu.Unlock()
  407. if !goodState || !canDelta {
  408. return false
  409. }
  410. ctx, cancel := context.WithTimeout(c.mapCtx, 2*time.Second)
  411. defer cancel()
  412. var ok bool
  413. err := c.observerQueue.RunSync(ctx, func() {
  414. ok = ndu.UpdateNetmapDelta(muts)
  415. })
  416. return err == nil && ok
  417. }
  418. // mapRoutine is responsible for keeping a read-only streaming connection to the
  419. // control server, and keeping the netmap up to date.
  420. func (c *Auto) mapRoutine() {
  421. defer close(c.mapDone)
  422. mrs := &mapRoutineState{
  423. c: c,
  424. bo: backoff.NewBackoff("mapRoutine", c.logf, 30*time.Second),
  425. }
  426. for {
  427. if !c.waitUnpause("mapRoutine") {
  428. c.logf("mapRoutine: exiting")
  429. return
  430. }
  431. c.mu.Lock()
  432. c.logf("[v1] mapRoutine: %s", c.state)
  433. loggedIn := c.loggedIn
  434. ctx := c.mapCtx
  435. c.mu.Unlock()
  436. report := func(err error, msg string) {
  437. c.logf("[v1] %s: %v", msg, err)
  438. err = fmt.Errorf("%s: %w", msg, err)
  439. // don't send status updates for context errors,
  440. // since context cancelation is always on purpose.
  441. if ctx.Err() == nil {
  442. c.sendStatus("mapRoutine1", err, "", nil)
  443. }
  444. }
  445. if !loggedIn {
  446. // Wait for something interesting to happen
  447. c.mu.Lock()
  448. c.inMapPoll = false
  449. c.mu.Unlock()
  450. <-ctx.Done()
  451. c.logf("[v1] mapRoutine: context done.")
  452. continue
  453. }
  454. health.SetOutOfPollNetMap()
  455. err := c.direct.PollNetMap(ctx, mrs)
  456. health.SetOutOfPollNetMap()
  457. c.mu.Lock()
  458. c.inMapPoll = false
  459. if c.state == StateSynchronized {
  460. c.state = StateAuthenticated
  461. }
  462. paused := c.paused
  463. c.mu.Unlock()
  464. if paused {
  465. mrs.bo.BackOff(ctx, nil)
  466. c.logf("mapRoutine: paused")
  467. } else {
  468. mrs.bo.BackOff(ctx, err)
  469. report(err, "PollNetMap")
  470. }
  471. }
  472. }
  473. func (c *Auto) AuthCantContinue() bool {
  474. if c == nil {
  475. return true
  476. }
  477. c.mu.Lock()
  478. defer c.mu.Unlock()
  479. return !c.loggedIn && (c.loginGoal == nil || c.loginGoal.url != "")
  480. }
  481. func (c *Auto) SetHostinfo(hi *tailcfg.Hostinfo) {
  482. if hi == nil {
  483. panic("nil Hostinfo")
  484. }
  485. if !c.direct.SetHostinfo(hi) {
  486. // No changes. Don't log.
  487. return
  488. }
  489. // Send new Hostinfo to server
  490. c.updateControl()
  491. }
  492. func (c *Auto) SetNetInfo(ni *tailcfg.NetInfo) {
  493. if ni == nil {
  494. panic("nil NetInfo")
  495. }
  496. if !c.direct.SetNetInfo(ni) {
  497. return
  498. }
  499. // Send new NetInfo to server
  500. c.updateControl()
  501. }
  502. // SetTKAHead updates the TKA head hash that map-request infrastructure sends.
  503. func (c *Auto) SetTKAHead(headHash string) {
  504. if !c.direct.SetTKAHead(headHash) {
  505. return
  506. }
  507. // Send new TKAHead to server
  508. c.updateControl()
  509. }
  510. // sendStatus can not be called with the c.mu held.
  511. func (c *Auto) sendStatus(who string, err error, url string, nm *netmap.NetworkMap) {
  512. c.mu.Lock()
  513. if c.closed {
  514. c.mu.Unlock()
  515. return
  516. }
  517. state := c.state
  518. loggedIn := c.loggedIn
  519. inMapPoll := c.inMapPoll
  520. c.mu.Unlock()
  521. c.logf("[v1] sendStatus: %s: %v", who, state)
  522. var p persist.PersistView
  523. if nm != nil && loggedIn && inMapPoll {
  524. p = c.direct.GetPersist()
  525. } else {
  526. // don't send netmap status, as it's misleading when we're
  527. // not logged in.
  528. nm = nil
  529. }
  530. new := Status{
  531. URL: url,
  532. Persist: p,
  533. NetMap: nm,
  534. Err: err,
  535. state: state,
  536. }
  537. // Launch a new goroutine to avoid blocking the caller while the observer
  538. // does its thing, which may result in a call back into the client.
  539. c.observerQueue.Add(func() {
  540. c.observer.SetControlClientStatus(c, new)
  541. })
  542. }
  543. func (c *Auto) Login(t *tailcfg.Oauth2Token, flags LoginFlags) {
  544. c.logf("client.Login(%v, %v)", t != nil, flags)
  545. c.mu.Lock()
  546. defer c.mu.Unlock()
  547. if c.closed {
  548. return
  549. }
  550. c.wantLoggedIn = true
  551. c.loginGoal = &LoginGoal{
  552. token: t,
  553. flags: flags,
  554. }
  555. c.cancelMapCtxLocked()
  556. c.cancelAuthCtxLocked()
  557. }
  558. var ErrClientClosed = errors.New("client closed")
  559. func (c *Auto) Logout(ctx context.Context) error {
  560. c.logf("client.Logout()")
  561. c.mu.Lock()
  562. c.wantLoggedIn = false
  563. c.loginGoal = nil
  564. closed := c.closed
  565. c.mu.Unlock()
  566. if closed {
  567. return ErrClientClosed
  568. }
  569. if err := c.direct.TryLogout(ctx); err != nil {
  570. return err
  571. }
  572. c.mu.Lock()
  573. c.loggedIn = false
  574. c.state = StateNotAuthenticated
  575. c.cancelAuthCtxLocked()
  576. c.cancelMapCtxLocked()
  577. c.mu.Unlock()
  578. c.sendStatus("authRoutine-wantout", nil, "", nil)
  579. return nil
  580. }
  581. func (c *Auto) SetExpirySooner(ctx context.Context, expiry time.Time) error {
  582. return c.direct.SetExpirySooner(ctx, expiry)
  583. }
  584. // UpdateEndpoints sets the client's discovered endpoints and sends
  585. // them to the control server if they've changed.
  586. //
  587. // It does not retain the provided slice.
  588. func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
  589. changed := c.direct.SetEndpoints(endpoints)
  590. if changed {
  591. c.updateControl()
  592. }
  593. }
  594. func (c *Auto) Shutdown() {
  595. c.logf("client.Shutdown()")
  596. c.mu.Lock()
  597. closed := c.closed
  598. direct := c.direct
  599. if !closed {
  600. c.closed = true
  601. c.observerQueue.shutdown()
  602. c.cancelAuthCtxLocked()
  603. c.cancelMapCtxLocked()
  604. for _, w := range c.unpauseWaiters {
  605. w <- false
  606. }
  607. c.unpauseWaiters = nil
  608. }
  609. c.mu.Unlock()
  610. c.logf("client.Shutdown")
  611. if !closed {
  612. c.unregisterHealthWatch()
  613. <-c.authDone
  614. <-c.mapDone
  615. <-c.updateDone
  616. if direct != nil {
  617. direct.Close()
  618. }
  619. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
  620. defer cancel()
  621. c.observerQueue.wait(ctx)
  622. c.logf("Client.Shutdown done.")
  623. }
  624. }
  625. // NodePublicKey returns the node public key currently in use. This is
  626. // used exclusively in tests.
  627. func (c *Auto) TestOnlyNodePublicKey() key.NodePublic {
  628. priv := c.direct.GetPersist()
  629. return priv.PrivateNodeKey().Public()
  630. }
  631. func (c *Auto) TestOnlySetAuthKey(authkey string) {
  632. c.direct.mu.Lock()
  633. defer c.direct.mu.Unlock()
  634. c.direct.authKey = authkey
  635. }
  636. func (c *Auto) TestOnlyTimeNow() time.Time {
  637. return c.clock.Now()
  638. }
  639. // SetDNS sends the SetDNSRequest request to the control plane server,
  640. // requesting a DNS record be created or updated.
  641. func (c *Auto) SetDNS(ctx context.Context, req *tailcfg.SetDNSRequest) error {
  642. return c.direct.SetDNS(ctx, req)
  643. }
  644. func (c *Auto) DoNoiseRequest(req *http.Request) (*http.Response, error) {
  645. return c.direct.DoNoiseRequest(req)
  646. }
  647. // GetSingleUseNoiseRoundTripper returns a RoundTripper that can be only be used
  648. // once (and must be used once) to make a single HTTP request over the noise
  649. // channel to the coordination server.
  650. //
  651. // In addition to the RoundTripper, it returns the HTTP/2 channel's early noise
  652. // payload, if any.
  653. func (c *Auto) GetSingleUseNoiseRoundTripper(ctx context.Context) (http.RoundTripper, *tailcfg.EarlyNoise, error) {
  654. return c.direct.GetSingleUseNoiseRoundTripper(ctx)
  655. }
  656. type execQueue struct {
  657. mu sync.Mutex
  658. closed bool
  659. inFlight bool // whether a goroutine is running q.run
  660. doneWaiter chan struct{} // non-nil if waiter is waiting, then closed
  661. queue []func()
  662. }
  663. func (q *execQueue) Add(f func()) {
  664. q.mu.Lock()
  665. defer q.mu.Unlock()
  666. if q.closed {
  667. return
  668. }
  669. if q.inFlight {
  670. q.queue = append(q.queue, f)
  671. } else {
  672. q.inFlight = true
  673. go q.run(f)
  674. }
  675. }
  676. // RunSync waits for the queue to be drained and then synchronously runs f.
  677. // It returns an error if the queue is closed before f is run or ctx expires.
  678. func (q *execQueue) RunSync(ctx context.Context, f func()) error {
  679. for {
  680. if err := q.wait(ctx); err != nil {
  681. return err
  682. }
  683. q.mu.Lock()
  684. if q.inFlight {
  685. q.mu.Unlock()
  686. continue
  687. }
  688. defer q.mu.Unlock()
  689. if q.closed {
  690. return errors.New("closed")
  691. }
  692. f()
  693. return nil
  694. }
  695. }
  696. func (q *execQueue) run(f func()) {
  697. f()
  698. q.mu.Lock()
  699. for len(q.queue) > 0 && !q.closed {
  700. f := q.queue[0]
  701. q.queue[0] = nil
  702. q.queue = q.queue[1:]
  703. q.mu.Unlock()
  704. f()
  705. q.mu.Lock()
  706. }
  707. q.inFlight = false
  708. q.queue = nil
  709. if q.doneWaiter != nil {
  710. close(q.doneWaiter)
  711. q.doneWaiter = nil
  712. }
  713. q.mu.Unlock()
  714. }
  715. func (q *execQueue) shutdown() {
  716. q.mu.Lock()
  717. defer q.mu.Unlock()
  718. q.closed = true
  719. }
  720. // wait waits for the queue to be empty.
  721. func (q *execQueue) wait(ctx context.Context) error {
  722. q.mu.Lock()
  723. waitCh := q.doneWaiter
  724. if q.inFlight && waitCh == nil {
  725. waitCh = make(chan struct{})
  726. q.doneWaiter = waitCh
  727. }
  728. q.mu.Unlock()
  729. if waitCh == nil {
  730. return nil
  731. }
  732. select {
  733. case <-waitCh:
  734. return nil
  735. case <-ctx.Done():
  736. return ctx.Err()
  737. }
  738. }