k8s-proxy.go 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. //go:build !plan9
  4. // k8s-proxy proxies between tailnet and Kubernetes cluster traffic.
  5. // Currently, it only supports proxying tailnet clients to the Kubernetes API
  6. // server.
  7. package main
  8. import (
  9. "context"
  10. "errors"
  11. "fmt"
  12. "net"
  13. "net/http"
  14. "os"
  15. "os/signal"
  16. "reflect"
  17. "strconv"
  18. "strings"
  19. "syscall"
  20. "time"
  21. "go.uber.org/zap"
  22. "go.uber.org/zap/zapcore"
  23. "golang.org/x/sync/errgroup"
  24. "k8s.io/client-go/kubernetes"
  25. "k8s.io/client-go/rest"
  26. "k8s.io/client-go/tools/clientcmd"
  27. "k8s.io/utils/strings/slices"
  28. "tailscale.com/client/local"
  29. "tailscale.com/cmd/k8s-proxy/internal/config"
  30. "tailscale.com/hostinfo"
  31. "tailscale.com/ipn"
  32. "tailscale.com/ipn/store"
  33. // we need to import this package so that the `kube:` ipn store gets registered
  34. _ "tailscale.com/ipn/store/kubestore"
  35. apiproxy "tailscale.com/k8s-operator/api-proxy"
  36. "tailscale.com/kube/certs"
  37. healthz "tailscale.com/kube/health"
  38. "tailscale.com/kube/k8s-proxy/conf"
  39. "tailscale.com/kube/kubetypes"
  40. klc "tailscale.com/kube/localclient"
  41. "tailscale.com/kube/metrics"
  42. "tailscale.com/kube/services"
  43. "tailscale.com/kube/state"
  44. "tailscale.com/tailcfg"
  45. "tailscale.com/tsnet"
  46. )
  47. func main() {
  48. encoderCfg := zap.NewProductionEncoderConfig()
  49. encoderCfg.EncodeTime = zapcore.RFC3339TimeEncoder
  50. logger := zap.Must(zap.Config{
  51. Level: zap.NewAtomicLevelAt(zap.DebugLevel),
  52. Encoding: "json",
  53. OutputPaths: []string{"stderr"},
  54. ErrorOutputPaths: []string{"stderr"},
  55. EncoderConfig: encoderCfg,
  56. }.Build()).Sugar()
  57. defer logger.Sync()
  58. if err := run(logger); err != nil {
  59. logger.Fatal(err.Error())
  60. }
  61. }
  62. func run(logger *zap.SugaredLogger) error {
  63. var (
  64. configPath = os.Getenv("TS_K8S_PROXY_CONFIG")
  65. podUID = os.Getenv("POD_UID")
  66. podIP = os.Getenv("POD_IP")
  67. )
  68. if configPath == "" {
  69. return errors.New("TS_K8S_PROXY_CONFIG unset")
  70. }
  71. // serveCtx to live for the lifetime of the process, only gets cancelled
  72. // once the Tailscale Service has been drained
  73. serveCtx, serveCancel := context.WithCancel(context.Background())
  74. defer serveCancel()
  75. // ctx to cancel to start the shutdown process.
  76. ctx, cancel := context.WithCancel(serveCtx)
  77. defer cancel()
  78. sigsChan := make(chan os.Signal, 1)
  79. signal.Notify(sigsChan, syscall.SIGINT, syscall.SIGTERM)
  80. go func() {
  81. select {
  82. case <-ctx.Done():
  83. case s := <-sigsChan:
  84. logger.Infof("Received shutdown signal %s, exiting", s)
  85. cancel()
  86. }
  87. }()
  88. var group *errgroup.Group
  89. group, ctx = errgroup.WithContext(ctx)
  90. restConfig, err := getRestConfig(logger)
  91. if err != nil {
  92. return fmt.Errorf("error getting rest config: %w", err)
  93. }
  94. clientset, err := kubernetes.NewForConfig(restConfig)
  95. if err != nil {
  96. return fmt.Errorf("error creating Kubernetes clientset: %w", err)
  97. }
  98. // Load and watch config.
  99. cfgChan := make(chan *conf.Config)
  100. cfgLoader := config.NewConfigLoader(logger, clientset.CoreV1(), cfgChan)
  101. group.Go(func() error {
  102. return cfgLoader.WatchConfig(ctx, configPath)
  103. })
  104. // Get initial config.
  105. var cfg *conf.Config
  106. select {
  107. case <-ctx.Done():
  108. return group.Wait()
  109. case cfg = <-cfgChan:
  110. }
  111. if cfg.Parsed.LogLevel != nil {
  112. level, err := zapcore.ParseLevel(*cfg.Parsed.LogLevel)
  113. if err != nil {
  114. return fmt.Errorf("error parsing log level %q: %w", *cfg.Parsed.LogLevel, err)
  115. }
  116. logger = logger.WithOptions(zap.IncreaseLevel(level))
  117. }
  118. // TODO:(ChaosInTheCRD) This is a temporary workaround until we can set static endpoints using prefs
  119. if se := cfg.Parsed.StaticEndpoints; len(se) > 0 {
  120. logger.Debugf("setting static endpoints '%v' via TS_DEBUG_PRETENDPOINT environment variable", cfg.Parsed.StaticEndpoints)
  121. ses := make([]string, len(se))
  122. for i, e := range se {
  123. ses[i] = e.String()
  124. }
  125. err := os.Setenv("TS_DEBUG_PRETENDPOINT", strings.Join(ses, ","))
  126. if err != nil {
  127. return err
  128. }
  129. }
  130. if cfg.Parsed.App != nil {
  131. hostinfo.SetApp(*cfg.Parsed.App)
  132. }
  133. // TODO(tomhjp): Pass this setting directly into the store instead of using
  134. // environment variables.
  135. if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true) {
  136. os.Setenv("TS_CERT_SHARE_MODE", "rw")
  137. } else {
  138. os.Setenv("TS_CERT_SHARE_MODE", "ro")
  139. }
  140. st, err := getStateStore(cfg.Parsed.State, logger)
  141. if err != nil {
  142. return err
  143. }
  144. // If Pod UID unset, assume we're running outside of a cluster/not managed
  145. // by the operator, so no need to set additional state keys.
  146. if podUID != "" {
  147. if err := state.SetInitialKeys(st, podUID); err != nil {
  148. return fmt.Errorf("error setting initial state: %w", err)
  149. }
  150. }
  151. var authKey string
  152. if cfg.Parsed.AuthKey != nil {
  153. authKey = *cfg.Parsed.AuthKey
  154. }
  155. ts := &tsnet.Server{
  156. Logf: logger.Named("tsnet").Debugf,
  157. UserLogf: logger.Named("tsnet").Infof,
  158. Store: st,
  159. AuthKey: authKey,
  160. }
  161. if cfg.Parsed.ServerURL != nil {
  162. ts.ControlURL = *cfg.Parsed.ServerURL
  163. }
  164. if cfg.Parsed.Hostname != nil {
  165. ts.Hostname = *cfg.Parsed.Hostname
  166. }
  167. // Make sure we crash loop if Up doesn't complete in reasonable time.
  168. upCtx, upCancel := context.WithTimeout(ctx, time.Minute)
  169. defer upCancel()
  170. if _, err := ts.Up(upCtx); err != nil {
  171. return fmt.Errorf("error starting tailscale server: %w", err)
  172. }
  173. defer ts.Close()
  174. lc, err := ts.LocalClient()
  175. if err != nil {
  176. return fmt.Errorf("error getting local client: %w", err)
  177. }
  178. // Setup for updating state keys.
  179. if podUID != "" {
  180. group.Go(func() error {
  181. return state.KeepKeysUpdated(ctx, st, klc.New(lc))
  182. })
  183. }
  184. if cfg.Parsed.HealthCheckEnabled.EqualBool(true) || cfg.Parsed.MetricsEnabled.EqualBool(true) {
  185. addr := podIP
  186. if addr == "" {
  187. addr = cfg.GetLocalAddr()
  188. }
  189. addrPort := getLocalAddrPort(addr, cfg.GetLocalPort())
  190. mux := http.NewServeMux()
  191. localSrv := &http.Server{Addr: addrPort, Handler: mux}
  192. if cfg.Parsed.MetricsEnabled.EqualBool(true) {
  193. logger.Infof("Running metrics endpoint at %s/metrics", addrPort)
  194. metrics.RegisterMetricsHandlers(mux, lc, "")
  195. }
  196. if cfg.Parsed.HealthCheckEnabled.EqualBool(true) {
  197. ipV4, _ := ts.TailscaleIPs()
  198. hz := healthz.RegisterHealthHandlers(mux, ipV4.String(), logger.Infof)
  199. group.Go(func() error {
  200. err := hz.MonitorHealth(ctx, lc)
  201. if err == nil || errors.Is(err, context.Canceled) {
  202. return nil
  203. }
  204. return err
  205. })
  206. }
  207. group.Go(func() error {
  208. errChan := make(chan error)
  209. go func() {
  210. if err := localSrv.ListenAndServe(); err != nil {
  211. errChan <- err
  212. }
  213. close(errChan)
  214. }()
  215. select {
  216. case <-ctx.Done():
  217. sCtx, scancel := context.WithTimeout(serveCtx, 10*time.Second)
  218. defer scancel()
  219. return localSrv.Shutdown(sCtx)
  220. case err := <-errChan:
  221. return err
  222. }
  223. })
  224. }
  225. if v, ok := cfg.Parsed.AcceptRoutes.Get(); ok {
  226. _, err = lc.EditPrefs(ctx, &ipn.MaskedPrefs{
  227. RouteAllSet: true,
  228. Prefs: ipn.Prefs{RouteAll: v},
  229. })
  230. if err != nil {
  231. return fmt.Errorf("error editing prefs: %w", err)
  232. }
  233. }
  234. // TODO(tomhjp): There seems to be a bug that on restart the device does
  235. // not get reassigned it's already working Service IPs unless we clear and
  236. // reset the serve config.
  237. if err := lc.SetServeConfig(ctx, &ipn.ServeConfig{}); err != nil {
  238. return fmt.Errorf("error clearing existing ServeConfig: %w", err)
  239. }
  240. var cm *certs.CertManager
  241. if shouldIssueCerts(cfg) {
  242. logger.Infof("Will issue TLS certs for Tailscale Service")
  243. cm = certs.NewCertManager(klc.New(lc), logger.Infof)
  244. }
  245. if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
  246. return err
  247. }
  248. if cfg.Parsed.AdvertiseServices != nil {
  249. if _, err := lc.EditPrefs(ctx, &ipn.MaskedPrefs{
  250. AdvertiseServicesSet: true,
  251. Prefs: ipn.Prefs{
  252. AdvertiseServices: cfg.Parsed.AdvertiseServices,
  253. },
  254. }); err != nil {
  255. return fmt.Errorf("error setting prefs AdvertiseServices: %w", err)
  256. }
  257. }
  258. // Setup for the API server proxy.
  259. mode := kubetypes.APIServerProxyModeAuth
  260. if cfg.Parsed.APIServerProxy != nil && cfg.Parsed.APIServerProxy.Mode != nil {
  261. mode = *cfg.Parsed.APIServerProxy.Mode
  262. }
  263. ap, err := apiproxy.NewAPIServerProxy(logger.Named("apiserver-proxy"), restConfig, ts, mode, false)
  264. if err != nil {
  265. return fmt.Errorf("error creating api server proxy: %w", err)
  266. }
  267. group.Go(func() error {
  268. if err := ap.Run(serveCtx); err != nil {
  269. return fmt.Errorf("error running API server proxy: %w", err)
  270. }
  271. return nil
  272. })
  273. for {
  274. select {
  275. case <-ctx.Done():
  276. // Context cancelled, exit.
  277. logger.Info("Context cancelled, exiting")
  278. shutdownCtx, shutdownCancel := context.WithTimeout(serveCtx, 20*time.Second)
  279. unadvertiseErr := services.EnsureServicesNotAdvertised(shutdownCtx, lc, logger.Infof)
  280. shutdownCancel()
  281. serveCancel()
  282. return errors.Join(unadvertiseErr, group.Wait())
  283. case cfg = <-cfgChan:
  284. // Handle config reload.
  285. // TODO(tomhjp): Make auth mode reloadable.
  286. var prefs ipn.MaskedPrefs
  287. cfgLogger := logger
  288. currentPrefs, err := lc.GetPrefs(ctx)
  289. if err != nil {
  290. return fmt.Errorf("error getting current prefs: %w", err)
  291. }
  292. if !slices.Equal(currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices) {
  293. cfgLogger = cfgLogger.With("AdvertiseServices", fmt.Sprintf("%v -> %v", currentPrefs.AdvertiseServices, cfg.Parsed.AdvertiseServices))
  294. prefs.AdvertiseServicesSet = true
  295. prefs.Prefs.AdvertiseServices = cfg.Parsed.AdvertiseServices
  296. }
  297. if cfg.Parsed.Hostname != nil && *cfg.Parsed.Hostname != currentPrefs.Hostname {
  298. cfgLogger = cfgLogger.With("Hostname", fmt.Sprintf("%s -> %s", currentPrefs.Hostname, *cfg.Parsed.Hostname))
  299. prefs.HostnameSet = true
  300. prefs.Hostname = *cfg.Parsed.Hostname
  301. }
  302. if v, ok := cfg.Parsed.AcceptRoutes.Get(); ok && v != currentPrefs.RouteAll {
  303. cfgLogger = cfgLogger.With("AcceptRoutes", fmt.Sprintf("%v -> %v", currentPrefs.RouteAll, v))
  304. prefs.RouteAllSet = true
  305. prefs.Prefs.RouteAll = v
  306. }
  307. if !prefs.IsEmpty() {
  308. if _, err := lc.EditPrefs(ctx, &prefs); err != nil {
  309. return fmt.Errorf("error editing prefs: %w", err)
  310. }
  311. }
  312. if err := setServeConfig(ctx, lc, cm, apiServerProxyService(cfg)); err != nil {
  313. return fmt.Errorf("error setting serve config: %w", err)
  314. }
  315. cfgLogger.Infof("Config reloaded")
  316. }
  317. }
  318. }
  319. func getLocalAddrPort(addr string, port uint16) string {
  320. return net.JoinHostPort(addr, strconv.FormatUint(uint64(port), 10))
  321. }
  322. func getStateStore(path *string, logger *zap.SugaredLogger) (ipn.StateStore, error) {
  323. p := "mem:"
  324. if path != nil {
  325. p = *path
  326. } else {
  327. logger.Warn("No state Secret provided; using in-memory store, which will lose state on restart")
  328. }
  329. st, err := store.New(logger.Errorf, p)
  330. if err != nil {
  331. return nil, fmt.Errorf("error creating state store: %w", err)
  332. }
  333. return st, nil
  334. }
  335. func getRestConfig(logger *zap.SugaredLogger) (*rest.Config, error) {
  336. restConfig, err := rest.InClusterConfig()
  337. switch err {
  338. case nil:
  339. return restConfig, nil
  340. case rest.ErrNotInCluster:
  341. logger.Info("Not running in-cluster, falling back to kubeconfig")
  342. default:
  343. return nil, fmt.Errorf("error getting in-cluster config: %w", err)
  344. }
  345. loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
  346. clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, nil)
  347. restConfig, err = clientConfig.ClientConfig()
  348. if err != nil {
  349. return nil, fmt.Errorf("error loading kubeconfig: %w", err)
  350. }
  351. return restConfig, nil
  352. }
  353. func apiServerProxyService(cfg *conf.Config) tailcfg.ServiceName {
  354. if cfg.Parsed.APIServerProxy != nil &&
  355. cfg.Parsed.APIServerProxy.Enabled.EqualBool(true) &&
  356. cfg.Parsed.APIServerProxy.ServiceName != nil &&
  357. *cfg.Parsed.APIServerProxy.ServiceName != "" {
  358. return tailcfg.ServiceName(*cfg.Parsed.APIServerProxy.ServiceName)
  359. }
  360. return ""
  361. }
  362. func shouldIssueCerts(cfg *conf.Config) bool {
  363. return cfg.Parsed.APIServerProxy != nil &&
  364. cfg.Parsed.APIServerProxy.IssueCerts.EqualBool(true)
  365. }
  366. // setServeConfig sets up serve config such that it's serving for the passed in
  367. // Tailscale Service, and does nothing if it's already up to date.
  368. func setServeConfig(ctx context.Context, lc *local.Client, cm *certs.CertManager, name tailcfg.ServiceName) error {
  369. existingServeConfig, err := lc.GetServeConfig(ctx)
  370. if err != nil {
  371. return fmt.Errorf("error getting existing serve config: %w", err)
  372. }
  373. // Ensure serve config is cleared if no Tailscale Service.
  374. if name == "" {
  375. if reflect.DeepEqual(*existingServeConfig, ipn.ServeConfig{}) {
  376. // Already up to date.
  377. return nil
  378. }
  379. if cm != nil {
  380. cm.EnsureCertLoops(ctx, &ipn.ServeConfig{})
  381. }
  382. return lc.SetServeConfig(ctx, &ipn.ServeConfig{})
  383. }
  384. status, err := lc.StatusWithoutPeers(ctx)
  385. if err != nil {
  386. return fmt.Errorf("error getting local client status: %w", err)
  387. }
  388. serviceHostPort := ipn.HostPort(fmt.Sprintf("%s.%s:443", name.WithoutPrefix(), status.CurrentTailnet.MagicDNSSuffix))
  389. serveConfig := ipn.ServeConfig{
  390. // Configure for the Service hostname.
  391. Services: map[tailcfg.ServiceName]*ipn.ServiceConfig{
  392. name: {
  393. TCP: map[uint16]*ipn.TCPPortHandler{
  394. 443: {
  395. HTTPS: true,
  396. },
  397. },
  398. Web: map[ipn.HostPort]*ipn.WebServerConfig{
  399. serviceHostPort: {
  400. Handlers: map[string]*ipn.HTTPHandler{
  401. "/": {
  402. Proxy: "http://localhost:80",
  403. },
  404. },
  405. },
  406. },
  407. },
  408. },
  409. }
  410. if reflect.DeepEqual(*existingServeConfig, serveConfig) {
  411. // Already up to date.
  412. return nil
  413. }
  414. if cm != nil {
  415. cm.EnsureCertLoops(ctx, &serveConfig)
  416. }
  417. return lc.SetServeConfig(ctx, &serveConfig)
  418. }