| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794 |
- // Copyright (c) Tailscale Inc & AUTHORS
- // SPDX-License-Identifier: BSD-3-Clause
- package controlclient
- import (
- "context"
- "errors"
- "fmt"
- "net/http"
- "sync"
- "time"
- "tailscale.com/health"
- "tailscale.com/logtail/backoff"
- "tailscale.com/net/sockstats"
- "tailscale.com/tailcfg"
- "tailscale.com/types/empty"
- "tailscale.com/types/key"
- "tailscale.com/types/logger"
- "tailscale.com/types/netmap"
- "tailscale.com/types/persist"
- "tailscale.com/types/structs"
- )
- type LoginGoal struct {
- _ structs.Incomparable
- wantLoggedIn bool // true if we *want* to be logged in
- token *tailcfg.Oauth2Token // oauth token to use when logging in
- flags LoginFlags // flags to use when logging in
- url string // auth url that needs to be visited
- loggedOutResult chan<- error
- }
- func (g *LoginGoal) sendLogoutError(err error) {
- if g.loggedOutResult == nil {
- return
- }
- select {
- case g.loggedOutResult <- err:
- default:
- }
- }
- var _ Client = (*Auto)(nil)
- // Auto connects to a tailcontrol server for a node.
- // It's a concrete implementation of the Client interface.
- type Auto struct {
- direct *Direct // our interface to the server APIs
- timeNow func() time.Time
- logf logger.Logf
- expiry *time.Time
- closed bool
- newMapCh chan struct{} // readable when we must restart a map request
- statusFunc func(Status) // called to update Client status; always non-nil
- unregisterHealthWatch func()
- mu sync.Mutex // mutex guards the following fields
- paused bool // whether we should stop making HTTP requests
- unpauseWaiters []chan struct{}
- loggedIn bool // true if currently logged in
- loginGoal *LoginGoal // non-nil if some login activity is desired
- synced bool // true if our netmap is up-to-date
- inPollNetMap bool // true if currently running a PollNetMap
- inLiteMapUpdate bool // true if a lite (non-streaming) map request is outstanding
- liteMapUpdateCancel context.CancelFunc // cancels a lite map update, may be nil
- liteMapUpdateCancels int // how many times we've canceled a lite map update
- inSendStatus int // number of sendStatus calls currently in progress
- state State
- authCtx context.Context // context used for auth requests
- mapCtx context.Context // context used for netmap requests
- authCancel func() // cancel the auth context
- mapCancel func() // cancel the netmap context
- quit chan struct{} // when closed, goroutines should all exit
- authDone chan struct{} // when closed, auth goroutine is done
- mapDone chan struct{} // when closed, map goroutine is done
- }
- // New creates and starts a new Auto.
- func New(opts Options) (*Auto, error) {
- c, err := NewNoStart(opts)
- if c != nil {
- c.Start()
- }
- return c, err
- }
- // NewNoStart creates a new Auto, but without calling Start on it.
- func NewNoStart(opts Options) (_ *Auto, err error) {
- direct, err := NewDirect(opts)
- if err != nil {
- return nil, err
- }
- defer func() {
- if err != nil {
- direct.Close()
- }
- }()
- if opts.Status == nil {
- return nil, errors.New("missing required Options.Status")
- }
- if opts.Logf == nil {
- opts.Logf = func(fmt string, args ...any) {}
- }
- if opts.TimeNow == nil {
- opts.TimeNow = time.Now
- }
- c := &Auto{
- direct: direct,
- timeNow: opts.TimeNow,
- logf: opts.Logf,
- newMapCh: make(chan struct{}, 1),
- quit: make(chan struct{}),
- authDone: make(chan struct{}),
- mapDone: make(chan struct{}),
- statusFunc: opts.Status,
- }
- c.authCtx, c.authCancel = context.WithCancel(context.Background())
- c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto)
- c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
- c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto)
- c.unregisterHealthWatch = health.RegisterWatcher(direct.ReportHealthChange)
- return c, nil
- }
- // SetPaused controls whether HTTP activity should be paused.
- //
- // The client can be paused and unpaused repeatedly, unlike Start and Shutdown, which can only be used once.
- func (c *Auto) SetPaused(paused bool) {
- c.mu.Lock()
- defer c.mu.Unlock()
- if paused == c.paused {
- return
- }
- c.logf("setPaused(%v)", paused)
- c.paused = paused
- if paused {
- // Only cancel the map routine. (The auth routine isn't expensive
- // so it's fine to keep it running.)
- c.cancelMapLocked()
- } else {
- for _, ch := range c.unpauseWaiters {
- close(ch)
- }
- c.unpauseWaiters = nil
- }
- }
- // Start starts the client's goroutines.
- //
- // It should only be called for clients created by NewNoStart.
- func (c *Auto) Start() {
- go c.authRoutine()
- go c.mapRoutine()
- }
- // sendNewMapRequest either sends a new OmitPeers, non-streaming map request
- // (to just send Hostinfo/Netinfo/Endpoints info, while keeping an existing
- // streaming response open), or start a new streaming one if necessary.
- //
- // It should be called whenever there's something new to tell the server.
- func (c *Auto) sendNewMapRequest() {
- c.mu.Lock()
- // If we're not already streaming a netmap, then tear down everything
- // and start a new stream (which starts by sending a new map request)
- if !c.inPollNetMap || !c.loggedIn {
- c.mu.Unlock()
- c.cancelMapSafely()
- return
- }
- // If we are already in process of doing a LiteMapUpdate, cancel it and
- // try a new one. If this is the 10th time we have done this
- // cancelation, tear down everything and start again.
- const maxLiteMapUpdateAttempts = 10
- if c.inLiteMapUpdate {
- // Always cancel the in-flight lite map update, regardless of
- // whether we cancel the streaming map request or not.
- c.liteMapUpdateCancel()
- c.inLiteMapUpdate = false
- if c.liteMapUpdateCancels >= maxLiteMapUpdateAttempts {
- // Not making progress
- c.mu.Unlock()
- c.cancelMapSafely()
- return
- }
- // Increment our cancel counter and continue below to start a
- // new lite update.
- c.liteMapUpdateCancels++
- }
- // Otherwise, send a lite update that doesn't keep a
- // long-running stream response.
- defer c.mu.Unlock()
- c.inLiteMapUpdate = true
- ctx, cancel := context.WithTimeout(c.mapCtx, 10*time.Second)
- c.liteMapUpdateCancel = cancel
- go func() {
- defer cancel()
- t0 := time.Now()
- err := c.direct.SendLiteMapUpdate(ctx)
- d := time.Since(t0).Round(time.Millisecond)
- c.mu.Lock()
- c.inLiteMapUpdate = false
- c.liteMapUpdateCancel = nil
- if err == nil {
- c.liteMapUpdateCancels = 0
- }
- c.mu.Unlock()
- if err == nil {
- c.logf("[v1] successful lite map update in %v", d)
- return
- }
- if ctx.Err() == nil {
- c.logf("lite map update after %v: %v", d, err)
- }
- if !errors.Is(ctx.Err(), context.Canceled) {
- // Fall back to restarting the long-polling map
- // request (the old heavy way) if the lite update
- // failed for reasons other than the context being
- // canceled.
- c.cancelMapSafely()
- }
- }()
- }
- func (c *Auto) cancelAuth() {
- c.mu.Lock()
- if c.authCancel != nil {
- c.authCancel()
- }
- if !c.closed {
- c.authCtx, c.authCancel = context.WithCancel(context.Background())
- c.authCtx = sockstats.WithSockStats(c.authCtx, sockstats.LabelControlClientAuto)
- }
- c.mu.Unlock()
- }
- func (c *Auto) cancelMapLocked() {
- if c.mapCancel != nil {
- c.mapCancel()
- }
- if !c.closed {
- c.mapCtx, c.mapCancel = context.WithCancel(context.Background())
- c.mapCtx = sockstats.WithSockStats(c.mapCtx, sockstats.LabelControlClientAuto)
- }
- }
- func (c *Auto) cancelMapUnsafely() {
- c.mu.Lock()
- c.cancelMapLocked()
- c.mu.Unlock()
- }
- func (c *Auto) cancelMapSafely() {
- c.mu.Lock()
- defer c.mu.Unlock()
- // Always reset our lite map cancels counter if we're canceling
- // everything, since we're about to restart with a new map update; this
- // allows future calls to sendNewMapRequest to retry sending lite
- // updates.
- c.liteMapUpdateCancels = 0
- c.logf("[v1] cancelMapSafely: synced=%v", c.synced)
- if c.inPollNetMap {
- // received at least one netmap since the last
- // interruption. That means the server has already
- // fully processed our last request, which might
- // include UpdateEndpoints(). Interrupt it and try
- // again.
- c.cancelMapLocked()
- } else {
- // !synced means we either haven't done a netmap
- // request yet, or it hasn't answered yet. So the
- // server is in an undefined state. If we send
- // another netmap request too soon, it might race
- // with the last one, and if we're very unlucky,
- // the new request will be applied before the old one,
- // and the wrong endpoints will get registered. We
- // have to tell the client to abort politely, only
- // after it receives a response to its existing netmap
- // request.
- select {
- case c.newMapCh <- struct{}{}:
- c.logf("[v1] cancelMapSafely: wrote to channel")
- default:
- // if channel write failed, then there was already
- // an outstanding newMapCh request. One is enough,
- // since it'll always use the latest endpoints.
- c.logf("[v1] cancelMapSafely: channel was full")
- }
- }
- }
- func (c *Auto) authRoutine() {
- defer close(c.authDone)
- bo := backoff.NewBackoff("authRoutine", c.logf, 30*time.Second)
- for {
- c.mu.Lock()
- goal := c.loginGoal
- ctx := c.authCtx
- if goal != nil {
- c.logf("[v1] authRoutine: %s; wantLoggedIn=%v", c.state, goal.wantLoggedIn)
- } else {
- c.logf("[v1] authRoutine: %s; goal=nil paused=%v", c.state, c.paused)
- }
- c.mu.Unlock()
- select {
- case <-c.quit:
- c.logf("[v1] authRoutine: quit")
- return
- default:
- }
- report := func(err error, msg string) {
- c.logf("[v1] %s: %v", msg, err)
- // don't send status updates for context errors,
- // since context cancelation is always on purpose.
- if ctx.Err() == nil {
- c.sendStatus("authRoutine-report", err, "", nil)
- }
- }
- if goal == nil {
- health.SetAuthRoutineInError(nil)
- // Wait for user to Login or Logout.
- <-ctx.Done()
- c.logf("[v1] authRoutine: context done.")
- continue
- }
- if !goal.wantLoggedIn {
- health.SetAuthRoutineInError(nil)
- err := c.direct.TryLogout(ctx)
- goal.sendLogoutError(err)
- if err != nil {
- report(err, "TryLogout")
- bo.BackOff(ctx, err)
- continue
- }
- // success
- c.mu.Lock()
- c.loggedIn = false
- c.loginGoal = nil
- c.state = StateNotAuthenticated
- c.synced = false
- c.mu.Unlock()
- c.sendStatus("authRoutine-wantout", nil, "", nil)
- bo.BackOff(ctx, nil)
- } else { // ie. goal.wantLoggedIn
- c.mu.Lock()
- if goal.url != "" {
- c.state = StateURLVisitRequired
- } else {
- c.state = StateAuthenticating
- }
- c.mu.Unlock()
- var url string
- var err error
- var f string
- if goal.url != "" {
- url, err = c.direct.WaitLoginURL(ctx, goal.url)
- f = "WaitLoginURL"
- } else {
- url, err = c.direct.TryLogin(ctx, goal.token, goal.flags)
- f = "TryLogin"
- }
- if err != nil {
- health.SetAuthRoutineInError(err)
- report(err, f)
- bo.BackOff(ctx, err)
- continue
- }
- if url != "" {
- // goal.url ought to be empty here.
- // However, not all control servers get this right,
- // and logging about it here just generates noise.
- c.mu.Lock()
- c.loginGoal = &LoginGoal{
- wantLoggedIn: true,
- flags: LoginDefault,
- url: url,
- }
- c.state = StateURLVisitRequired
- c.synced = false
- c.mu.Unlock()
- c.sendStatus("authRoutine-url", err, url, nil)
- if goal.url == url {
- // The server sent us the same URL we already tried,
- // backoff to avoid a busy loop.
- bo.BackOff(ctx, errors.New("login URL not changing"))
- } else {
- bo.BackOff(ctx, nil)
- }
- continue
- }
- // success
- health.SetAuthRoutineInError(nil)
- c.mu.Lock()
- c.loggedIn = true
- c.loginGoal = nil
- c.state = StateAuthenticated
- c.mu.Unlock()
- c.sendStatus("authRoutine-success", nil, "", nil)
- c.cancelMapSafely()
- bo.BackOff(ctx, nil)
- }
- }
- }
- // Expiry returns the credential expiration time, or the zero time if
- // the expiration time isn't known. Used in tests only.
- func (c *Auto) Expiry() *time.Time {
- c.mu.Lock()
- defer c.mu.Unlock()
- return c.expiry
- }
- // Direct returns the underlying direct client object. Used in tests
- // only.
- func (c *Auto) Direct() *Direct {
- return c.direct
- }
- // unpausedChanLocked returns a new channel that is closed when the
- // current Auto pause is unpaused.
- //
- // c.mu must be held
- func (c *Auto) unpausedChanLocked() <-chan struct{} {
- unpaused := make(chan struct{})
- c.unpauseWaiters = append(c.unpauseWaiters, unpaused)
- return unpaused
- }
- func (c *Auto) mapRoutine() {
- defer close(c.mapDone)
- bo := backoff.NewBackoff("mapRoutine", c.logf, 30*time.Second)
- for {
- c.mu.Lock()
- if c.paused {
- unpaused := c.unpausedChanLocked()
- c.mu.Unlock()
- c.logf("mapRoutine: awaiting unpause")
- select {
- case <-unpaused:
- c.logf("mapRoutine: unpaused")
- case <-c.quit:
- c.logf("mapRoutine: quit")
- return
- }
- continue
- }
- c.logf("[v1] mapRoutine: %s", c.state)
- loggedIn := c.loggedIn
- ctx := c.mapCtx
- c.mu.Unlock()
- select {
- case <-c.quit:
- c.logf("mapRoutine: quit")
- return
- default:
- }
- report := func(err error, msg string) {
- c.logf("[v1] %s: %v", msg, err)
- err = fmt.Errorf("%s: %w", msg, err)
- // don't send status updates for context errors,
- // since context cancelation is always on purpose.
- if ctx.Err() == nil {
- c.sendStatus("mapRoutine1", err, "", nil)
- }
- }
- if !loggedIn {
- // Wait for something interesting to happen
- c.mu.Lock()
- c.synced = false
- // c.state is set by authRoutine()
- c.mu.Unlock()
- select {
- case <-ctx.Done():
- c.logf("[v1] mapRoutine: context done.")
- case <-c.newMapCh:
- c.logf("[v1] mapRoutine: new map needed while idle.")
- }
- } else {
- // Be sure this is false when we're not inside
- // PollNetMap, so that cancelMapSafely() can notify
- // us correctly.
- c.mu.Lock()
- c.inPollNetMap = false
- c.mu.Unlock()
- health.SetInPollNetMap(false)
- err := c.direct.PollNetMap(ctx, func(nm *netmap.NetworkMap) {
- health.SetInPollNetMap(true)
- c.mu.Lock()
- select {
- case <-c.newMapCh:
- c.logf("[v1] mapRoutine: new map request during PollNetMap. canceling.")
- c.cancelMapLocked()
- // Don't emit this netmap; we're
- // about to request a fresh one.
- c.mu.Unlock()
- return
- default:
- }
- c.synced = true
- c.inPollNetMap = true
- if c.loggedIn {
- c.state = StateSynchronized
- }
- exp := nm.Expiry
- c.expiry = &exp
- stillAuthed := c.loggedIn
- state := c.state
- c.mu.Unlock()
- c.logf("[v1] mapRoutine: netmap received: %s", state)
- if stillAuthed {
- c.sendStatus("mapRoutine-got-netmap", nil, "", nm)
- }
- })
- health.SetInPollNetMap(false)
- c.mu.Lock()
- c.synced = false
- c.inPollNetMap = false
- if c.state == StateSynchronized {
- c.state = StateAuthenticated
- }
- paused := c.paused
- c.mu.Unlock()
- if paused {
- c.logf("mapRoutine: paused")
- continue
- }
- if err != nil {
- report(err, "PollNetMap")
- bo.BackOff(ctx, err)
- continue
- }
- bo.BackOff(ctx, nil)
- }
- }
- }
- func (c *Auto) AuthCantContinue() bool {
- if c == nil {
- return true
- }
- c.mu.Lock()
- defer c.mu.Unlock()
- return !c.loggedIn && (c.loginGoal == nil || c.loginGoal.url != "")
- }
- func (c *Auto) SetHostinfo(hi *tailcfg.Hostinfo) {
- if hi == nil {
- panic("nil Hostinfo")
- }
- if !c.direct.SetHostinfo(hi) {
- // No changes. Don't log.
- return
- }
- // Send new Hostinfo to server
- c.sendNewMapRequest()
- }
- func (c *Auto) SetNetInfo(ni *tailcfg.NetInfo) {
- if ni == nil {
- panic("nil NetInfo")
- }
- if !c.direct.SetNetInfo(ni) {
- return
- }
- // Send new NetInfo to server
- c.sendNewMapRequest()
- }
- // SetTKAHead updates the TKA head hash that map-request infrastructure sends.
- func (c *Auto) SetTKAHead(headHash string) {
- c.direct.SetTKAHead(headHash)
- }
- func (c *Auto) sendStatus(who string, err error, url string, nm *netmap.NetworkMap) {
- c.mu.Lock()
- if c.closed {
- c.mu.Unlock()
- return
- }
- state := c.state
- loggedIn := c.loggedIn
- synced := c.synced
- c.inSendStatus++
- c.mu.Unlock()
- c.logf("[v1] sendStatus: %s: %v", who, state)
- var p *persist.PersistView
- var loginFin, logoutFin *empty.Message
- if state == StateAuthenticated {
- loginFin = new(empty.Message)
- }
- if state == StateNotAuthenticated {
- logoutFin = new(empty.Message)
- }
- if nm != nil && loggedIn && synced {
- pp := c.direct.GetPersist()
- p = &pp
- } else {
- // don't send netmap status, as it's misleading when we're
- // not logged in.
- nm = nil
- }
- new := Status{
- LoginFinished: loginFin,
- LogoutFinished: logoutFin,
- URL: url,
- Persist: p,
- NetMap: nm,
- State: state,
- Err: err,
- }
- c.statusFunc(new)
- c.mu.Lock()
- c.inSendStatus--
- c.mu.Unlock()
- }
- func (c *Auto) Login(t *tailcfg.Oauth2Token, flags LoginFlags) {
- c.logf("client.Login(%v, %v)", t != nil, flags)
- c.mu.Lock()
- c.loginGoal = &LoginGoal{
- wantLoggedIn: true,
- token: t,
- flags: flags,
- }
- c.mu.Unlock()
- c.cancelAuth()
- }
- func (c *Auto) StartLogout() {
- c.logf("client.StartLogout()")
- c.mu.Lock()
- c.loginGoal = &LoginGoal{
- wantLoggedIn: false,
- }
- c.mu.Unlock()
- c.cancelAuth()
- }
- func (c *Auto) Logout(ctx context.Context) error {
- c.logf("client.Logout()")
- errc := make(chan error, 1)
- c.mu.Lock()
- c.loginGoal = &LoginGoal{
- wantLoggedIn: false,
- loggedOutResult: errc,
- }
- c.mu.Unlock()
- c.cancelAuth()
- timer := time.NewTimer(10 * time.Second)
- defer timer.Stop()
- select {
- case err := <-errc:
- return err
- case <-ctx.Done():
- return ctx.Err()
- case <-timer.C:
- return context.DeadlineExceeded
- }
- }
- func (c *Auto) SetExpirySooner(ctx context.Context, expiry time.Time) error {
- return c.direct.SetExpirySooner(ctx, expiry)
- }
- // UpdateEndpoints sets the client's discovered endpoints and sends
- // them to the control server if they've changed.
- //
- // It does not retain the provided slice.
- func (c *Auto) UpdateEndpoints(endpoints []tailcfg.Endpoint) {
- changed := c.direct.SetEndpoints(endpoints)
- if changed {
- c.sendNewMapRequest()
- }
- }
- func (c *Auto) Shutdown() {
- c.logf("client.Shutdown()")
- c.mu.Lock()
- inSendStatus := c.inSendStatus
- closed := c.closed
- direct := c.direct
- if !closed {
- c.closed = true
- }
- c.mu.Unlock()
- c.logf("client.Shutdown: inSendStatus=%v", inSendStatus)
- if !closed {
- c.unregisterHealthWatch()
- close(c.quit)
- c.cancelAuth()
- <-c.authDone
- c.cancelMapUnsafely()
- <-c.mapDone
- if direct != nil {
- direct.Close()
- }
- c.logf("Client.Shutdown done.")
- }
- }
- // NodePublicKey returns the node public key currently in use. This is
- // used exclusively in tests.
- func (c *Auto) TestOnlyNodePublicKey() key.NodePublic {
- priv := c.direct.GetPersist()
- return priv.PrivateNodeKey().Public()
- }
- func (c *Auto) TestOnlySetAuthKey(authkey string) {
- c.direct.mu.Lock()
- defer c.direct.mu.Unlock()
- c.direct.authKey = authkey
- }
- func (c *Auto) TestOnlyTimeNow() time.Time {
- return c.timeNow()
- }
- // SetDNS sends the SetDNSRequest request to the control plane server,
- // requesting a DNS record be created or updated.
- func (c *Auto) SetDNS(ctx context.Context, req *tailcfg.SetDNSRequest) error {
- return c.direct.SetDNS(ctx, req)
- }
- func (c *Auto) DoNoiseRequest(req *http.Request) (*http.Response, error) {
- return c.direct.DoNoiseRequest(req)
- }
- // GetSingleUseNoiseRoundTripper returns a RoundTripper that can be only be used
- // once (and must be used once) to make a single HTTP request over the noise
- // channel to the coordination server.
- //
- // In addition to the RoundTripper, it returns the HTTP/2 channel's early noise
- // payload, if any.
- func (c *Auto) GetSingleUseNoiseRoundTripper(ctx context.Context) (http.RoundTripper, *tailcfg.EarlyNoise, error) {
- return c.direct.GetSingleUseNoiseRoundTripper(ctx)
- }
|