proxygroup.go 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233
  1. // Copyright (c) Tailscale Inc & contributors
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. //go:build !plan9
  4. package main
  5. import (
  6. "context"
  7. "encoding/json"
  8. "errors"
  9. "fmt"
  10. "net/http"
  11. "net/netip"
  12. "slices"
  13. "sort"
  14. "strings"
  15. "sync"
  16. dockerref "github.com/distribution/reference"
  17. "go.uber.org/zap"
  18. xslices "golang.org/x/exp/slices"
  19. appsv1 "k8s.io/api/apps/v1"
  20. corev1 "k8s.io/api/core/v1"
  21. rbacv1 "k8s.io/api/rbac/v1"
  22. apiequality "k8s.io/apimachinery/pkg/api/equality"
  23. apierrors "k8s.io/apimachinery/pkg/api/errors"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/apimachinery/pkg/types"
  26. "k8s.io/apimachinery/pkg/util/intstr"
  27. "k8s.io/client-go/tools/record"
  28. "sigs.k8s.io/controller-runtime/pkg/client"
  29. "sigs.k8s.io/controller-runtime/pkg/reconcile"
  30. "tailscale.com/client/tailscale"
  31. "tailscale.com/ipn"
  32. tsoperator "tailscale.com/k8s-operator"
  33. tsapi "tailscale.com/k8s-operator/apis/v1alpha1"
  34. "tailscale.com/kube/egressservices"
  35. "tailscale.com/kube/k8s-proxy/conf"
  36. "tailscale.com/kube/kubetypes"
  37. "tailscale.com/tailcfg"
  38. "tailscale.com/tstime"
  39. "tailscale.com/types/opt"
  40. "tailscale.com/types/ptr"
  41. "tailscale.com/util/clientmetric"
  42. "tailscale.com/util/mak"
  43. "tailscale.com/util/set"
  44. )
  45. const (
  46. reasonProxyGroupCreationFailed = "ProxyGroupCreationFailed"
  47. reasonProxyGroupReady = "ProxyGroupReady"
  48. reasonProxyGroupAvailable = "ProxyGroupAvailable"
  49. reasonProxyGroupCreating = "ProxyGroupCreating"
  50. reasonProxyGroupInvalid = "ProxyGroupInvalid"
  51. reasonProxyGroupTailnetUnavailable = "ProxyGroupTailnetUnavailable"
  52. // Copied from k8s.io/apiserver/pkg/registry/generic/registry/store.go@cccad306d649184bf2a0e319ba830c53f65c445c
  53. optimisticLockErrorMsg = "the object has been modified; please apply your changes to the latest version and try again"
  54. staticEndpointsMaxAddrs = 2
  55. // The minimum tailcfg.CapabilityVersion that deployed clients are expected
  56. // to support to be compatible with the current ProxyGroup controller.
  57. // If the controller needs to depend on newer client behaviour, it should
  58. // maintain backwards compatible logic for older capability versions for 3
  59. // stable releases, as per documentation on supported version drift:
  60. // https://tailscale.com/kb/1236/kubernetes-operator#supported-versions
  61. //
  62. // tailcfg.CurrentCapabilityVersion was 106 when the ProxyGroup controller was
  63. // first introduced.
  64. pgMinCapabilityVersion = 106
  65. )
  66. var (
  67. gaugeEgressProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupEgressCount)
  68. gaugeIngressProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupIngressCount)
  69. gaugeAPIServerProxyGroupResources = clientmetric.NewGauge(kubetypes.MetricProxyGroupAPIServerCount)
  70. )
  71. // ProxyGroupReconciler ensures cluster resources for a ProxyGroup definition.
  72. type ProxyGroupReconciler struct {
  73. client.Client
  74. log *zap.SugaredLogger
  75. recorder record.EventRecorder
  76. clock tstime.Clock
  77. tsClient tsClient
  78. // User-specified defaults from the helm installation.
  79. tsNamespace string
  80. tsProxyImage string
  81. k8sProxyImage string
  82. defaultTags []string
  83. tsFirewallMode string
  84. defaultProxyClass string
  85. loginServer string
  86. mu sync.Mutex // protects following
  87. egressProxyGroups set.Slice[types.UID] // for egress proxygroups gauge
  88. ingressProxyGroups set.Slice[types.UID] // for ingress proxygroups gauge
  89. apiServerProxyGroups set.Slice[types.UID] // for kube-apiserver proxygroups gauge
  90. }
  91. func (r *ProxyGroupReconciler) logger(name string) *zap.SugaredLogger {
  92. return r.log.With("ProxyGroup", name)
  93. }
  94. func (r *ProxyGroupReconciler) Reconcile(ctx context.Context, req reconcile.Request) (_ reconcile.Result, err error) {
  95. logger := r.logger(req.Name)
  96. logger.Debugf("starting reconcile")
  97. defer logger.Debugf("reconcile finished")
  98. pg := new(tsapi.ProxyGroup)
  99. err = r.Get(ctx, req.NamespacedName, pg)
  100. if apierrors.IsNotFound(err) {
  101. logger.Debugf("ProxyGroup not found, assuming it was deleted")
  102. return reconcile.Result{}, nil
  103. } else if err != nil {
  104. return reconcile.Result{}, fmt.Errorf("failed to get tailscale.com ProxyGroup: %w", err)
  105. }
  106. tailscaleClient := r.tsClient
  107. if pg.Spec.Tailnet != "" {
  108. tc, err := clientForTailnet(ctx, r.Client, r.tsNamespace, pg.Spec.Tailnet)
  109. if err != nil {
  110. oldPGStatus := pg.Status.DeepCopy()
  111. nrr := &notReadyReason{
  112. reason: reasonProxyGroupTailnetUnavailable,
  113. message: err.Error(),
  114. }
  115. return reconcile.Result{}, errors.Join(err, r.maybeUpdateStatus(ctx, logger, pg, oldPGStatus, nrr, make(map[string][]netip.AddrPort)))
  116. }
  117. tailscaleClient = tc
  118. }
  119. if markedForDeletion(pg) {
  120. logger.Debugf("ProxyGroup is being deleted, cleaning up resources")
  121. ix := xslices.Index(pg.Finalizers, FinalizerName)
  122. if ix < 0 {
  123. logger.Debugf("no finalizer, nothing to do")
  124. return reconcile.Result{}, nil
  125. }
  126. if done, err := r.maybeCleanup(ctx, tailscaleClient, pg); err != nil {
  127. if strings.Contains(err.Error(), optimisticLockErrorMsg) {
  128. logger.Infof("optimistic lock error, retrying: %s", err)
  129. return reconcile.Result{}, nil
  130. }
  131. return reconcile.Result{}, err
  132. } else if !done {
  133. logger.Debugf("ProxyGroup resource cleanup not yet finished, will retry...")
  134. return reconcile.Result{RequeueAfter: shortRequeue}, nil
  135. }
  136. pg.Finalizers = slices.Delete(pg.Finalizers, ix, ix+1)
  137. if err := r.Update(ctx, pg); err != nil {
  138. return reconcile.Result{}, err
  139. }
  140. return reconcile.Result{}, nil
  141. }
  142. oldPGStatus := pg.Status.DeepCopy()
  143. staticEndpoints, nrr, err := r.reconcilePG(ctx, tailscaleClient, pg, logger)
  144. return reconcile.Result{}, errors.Join(err, r.maybeUpdateStatus(ctx, logger, pg, oldPGStatus, nrr, staticEndpoints))
  145. }
  146. // reconcilePG handles all reconciliation of a ProxyGroup that is not marked
  147. // for deletion. It is separated out from Reconcile to make a clear separation
  148. // between reconciling the ProxyGroup, and posting the status of its created
  149. // resources onto the ProxyGroup status field.
  150. func (r *ProxyGroupReconciler) reconcilePG(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, logger *zap.SugaredLogger) (map[string][]netip.AddrPort, *notReadyReason, error) {
  151. if !slices.Contains(pg.Finalizers, FinalizerName) {
  152. // This log line is printed exactly once during initial provisioning,
  153. // because once the finalizer is in place this block gets skipped. So,
  154. // this is a nice place to log that the high level, multi-reconcile
  155. // operation is underway.
  156. logger.Infof("ensuring ProxyGroup is set up")
  157. pg.Finalizers = append(pg.Finalizers, FinalizerName)
  158. if err := r.Update(ctx, pg); err != nil {
  159. return r.notReadyErrf(pg, logger, "error adding finalizer: %w", err)
  160. }
  161. }
  162. proxyClassName := r.defaultProxyClass
  163. if pg.Spec.ProxyClass != "" {
  164. proxyClassName = pg.Spec.ProxyClass
  165. }
  166. var proxyClass *tsapi.ProxyClass
  167. if proxyClassName != "" {
  168. proxyClass = new(tsapi.ProxyClass)
  169. err := r.Get(ctx, types.NamespacedName{Name: proxyClassName}, proxyClass)
  170. if apierrors.IsNotFound(err) {
  171. msg := fmt.Sprintf("the ProxyGroup's ProxyClass %q does not (yet) exist", proxyClassName)
  172. logger.Info(msg)
  173. return notReady(reasonProxyGroupCreating, msg)
  174. }
  175. if err != nil {
  176. return r.notReadyErrf(pg, logger, "error getting ProxyGroup's ProxyClass %q: %w", proxyClassName, err)
  177. }
  178. if !tsoperator.ProxyClassIsReady(proxyClass) {
  179. msg := fmt.Sprintf("the ProxyGroup's ProxyClass %q is not yet in a ready state, waiting...", proxyClassName)
  180. logger.Info(msg)
  181. return notReady(reasonProxyGroupCreating, msg)
  182. }
  183. }
  184. if err := r.validate(ctx, pg, proxyClass, logger); err != nil {
  185. return notReady(reasonProxyGroupInvalid, fmt.Sprintf("invalid ProxyGroup spec: %v", err))
  186. }
  187. staticEndpoints, nrr, err := r.maybeProvision(ctx, tailscaleClient, pg, proxyClass)
  188. if err != nil {
  189. return nil, nrr, err
  190. }
  191. return staticEndpoints, nrr, nil
  192. }
  193. func (r *ProxyGroupReconciler) validate(ctx context.Context, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass, logger *zap.SugaredLogger) error {
  194. // Our custom logic for ensuring minimum downtime ProxyGroup update rollouts relies on the local health check
  195. // beig accessible on the replica Pod IP:9002. This address can also be modified by users, via
  196. // TS_LOCAL_ADDR_PORT env var.
  197. //
  198. // Currently TS_LOCAL_ADDR_PORT controls Pod's health check and metrics address. _Probably_ there is no need for
  199. // users to set this to a custom value. Users who want to consume metrics, should integrate with the metrics
  200. // Service and/or ServiceMonitor, rather than Pods directly. The health check is likely not useful to integrate
  201. // directly with for operator proxies (and we should aim for unified lifecycle logic in the operator, users
  202. // shouldn't need to set their own).
  203. //
  204. // TODO(irbekrm): maybe disallow configuring this env var in future (in Tailscale 1.84 or later).
  205. if pg.Spec.Type == tsapi.ProxyGroupTypeEgress && hasLocalAddrPortSet(pc) {
  206. msg := fmt.Sprintf("ProxyClass %s applied to an egress ProxyGroup has TS_LOCAL_ADDR_PORT env var set to a custom value."+
  207. "This will disable the ProxyGroup graceful failover mechanism, so you might experience downtime when ProxyGroup pods are restarted."+
  208. "In future we will remove the ability to set custom TS_LOCAL_ADDR_PORT for egress ProxyGroups."+
  209. "Please raise an issue if you expect that this will cause issues for your workflow.", pc.Name)
  210. logger.Warn(msg)
  211. }
  212. // image is the value of pc.Spec.StatefulSet.Pod.TailscaleContainer.Image or ""
  213. // imagePath is a slash-delimited path ending with the image name, e.g.
  214. // "tailscale/tailscale" or maybe "k8s-proxy" if hosted at example.com/k8s-proxy.
  215. var image, imagePath string
  216. if pc != nil &&
  217. pc.Spec.StatefulSet != nil &&
  218. pc.Spec.StatefulSet.Pod != nil &&
  219. pc.Spec.StatefulSet.Pod.TailscaleContainer != nil &&
  220. pc.Spec.StatefulSet.Pod.TailscaleContainer.Image != "" {
  221. image, err := dockerref.ParseNormalizedNamed(pc.Spec.StatefulSet.Pod.TailscaleContainer.Image)
  222. if err != nil {
  223. // Shouldn't be possible as the ProxyClass won't be marked ready
  224. // without successfully parsing the image.
  225. return fmt.Errorf("error parsing %q as a container image reference: %w", pc.Spec.StatefulSet.Pod.TailscaleContainer.Image, err)
  226. }
  227. imagePath = dockerref.Path(image)
  228. }
  229. var errs []error
  230. if isAuthAPIServerProxy(pg) {
  231. // Validate that the static ServiceAccount already exists.
  232. sa := &corev1.ServiceAccount{}
  233. if err := r.Get(ctx, types.NamespacedName{Namespace: r.tsNamespace, Name: authAPIServerProxySAName}, sa); err != nil {
  234. if !apierrors.IsNotFound(err) {
  235. return fmt.Errorf("error validating that ServiceAccount %q exists: %w", authAPIServerProxySAName, err)
  236. }
  237. errs = append(errs, fmt.Errorf("the ServiceAccount %q used for the API server proxy in auth mode does not exist but "+
  238. "should have been created during operator installation; use apiServerProxyConfig.allowImpersonation=true "+
  239. "in the helm chart, or authproxy-rbac.yaml from the static manifests", authAPIServerProxySAName))
  240. }
  241. } else {
  242. // Validate that the ServiceAccount we create won't overwrite the static one.
  243. // TODO(tomhjp): This doesn't cover other controllers that could create a
  244. // ServiceAccount. Perhaps should have some guards to ensure that an update
  245. // would never change the ownership of a resource we expect to already be owned.
  246. if pgServiceAccountName(pg) == authAPIServerProxySAName {
  247. errs = append(errs, fmt.Errorf("the name of the ProxyGroup %q conflicts with the static ServiceAccount used for the API server proxy in auth mode", pg.Name))
  248. }
  249. }
  250. if pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer {
  251. if strings.HasSuffix(imagePath, "tailscale") {
  252. errs = append(errs, fmt.Errorf("the configured ProxyClass %q specifies to use image %q but expected a %q image for ProxyGroup of type %q", pc.Name, image, "k8s-proxy", pg.Spec.Type))
  253. }
  254. if pc != nil && pc.Spec.StatefulSet != nil && pc.Spec.StatefulSet.Pod != nil && pc.Spec.StatefulSet.Pod.TailscaleInitContainer != nil {
  255. errs = append(errs, fmt.Errorf("the configured ProxyClass %q specifies Tailscale init container config, but ProxyGroups of type %q do not use init containers", pc.Name, pg.Spec.Type))
  256. }
  257. } else {
  258. if strings.HasSuffix(imagePath, "k8s-proxy") {
  259. errs = append(errs, fmt.Errorf("the configured ProxyClass %q specifies to use image %q but expected a %q image for ProxyGroup of type %q", pc.Name, image, "tailscale", pg.Spec.Type))
  260. }
  261. }
  262. return errors.Join(errs...)
  263. }
  264. func (r *ProxyGroupReconciler) maybeProvision(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, proxyClass *tsapi.ProxyClass) (map[string][]netip.AddrPort, *notReadyReason, error) {
  265. logger := r.logger(pg.Name)
  266. r.mu.Lock()
  267. r.ensureAddedToGaugeForProxyGroup(pg)
  268. r.mu.Unlock()
  269. svcToNodePorts := make(map[string]uint16)
  270. var tailscaledPort *uint16
  271. if proxyClass != nil && proxyClass.Spec.StaticEndpoints != nil {
  272. var err error
  273. svcToNodePorts, tailscaledPort, err = r.ensureNodePortServiceCreated(ctx, pg, proxyClass)
  274. if err != nil {
  275. var allocatePortErr *allocatePortsErr
  276. if errors.As(err, &allocatePortErr) {
  277. reason := reasonProxyGroupCreationFailed
  278. msg := fmt.Sprintf("error provisioning NodePort Services for static endpoints: %v", err)
  279. r.recorder.Event(pg, corev1.EventTypeWarning, reason, msg)
  280. return notReady(reason, msg)
  281. }
  282. return r.notReadyErrf(pg, logger, "error provisioning NodePort Services for static endpoints: %w", err)
  283. }
  284. }
  285. staticEndpoints, err := r.ensureConfigSecretsCreated(ctx, tailscaleClient, pg, proxyClass, svcToNodePorts)
  286. if err != nil {
  287. var selectorErr *FindStaticEndpointErr
  288. if errors.As(err, &selectorErr) {
  289. reason := reasonProxyGroupCreationFailed
  290. msg := fmt.Sprintf("error provisioning config Secrets: %v", err)
  291. r.recorder.Event(pg, corev1.EventTypeWarning, reason, msg)
  292. return notReady(reason, msg)
  293. }
  294. return r.notReadyErrf(pg, logger, "error provisioning config Secrets: %w", err)
  295. }
  296. // State secrets are precreated so we can use the ProxyGroup CR as their owner ref.
  297. stateSecrets := pgStateSecrets(pg, r.tsNamespace)
  298. for _, sec := range stateSecrets {
  299. if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sec, func(s *corev1.Secret) {
  300. s.ObjectMeta.Labels = sec.ObjectMeta.Labels
  301. s.ObjectMeta.Annotations = sec.ObjectMeta.Annotations
  302. s.ObjectMeta.OwnerReferences = sec.ObjectMeta.OwnerReferences
  303. }); err != nil {
  304. return r.notReadyErrf(pg, logger, "error provisioning state Secrets: %w", err)
  305. }
  306. }
  307. // auth mode kube-apiserver ProxyGroups use a statically created
  308. // ServiceAccount to keep ClusterRole creation permissions limited to the
  309. // helm chart installer.
  310. if !isAuthAPIServerProxy(pg) {
  311. sa := pgServiceAccount(pg, r.tsNamespace)
  312. if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, sa, func(s *corev1.ServiceAccount) {
  313. s.ObjectMeta.Labels = sa.ObjectMeta.Labels
  314. s.ObjectMeta.Annotations = sa.ObjectMeta.Annotations
  315. s.ObjectMeta.OwnerReferences = sa.ObjectMeta.OwnerReferences
  316. }); err != nil {
  317. return r.notReadyErrf(pg, logger, "error provisioning ServiceAccount: %w", err)
  318. }
  319. }
  320. role := pgRole(pg, r.tsNamespace)
  321. if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, role, func(r *rbacv1.Role) {
  322. r.ObjectMeta.Labels = role.ObjectMeta.Labels
  323. r.ObjectMeta.Annotations = role.ObjectMeta.Annotations
  324. r.ObjectMeta.OwnerReferences = role.ObjectMeta.OwnerReferences
  325. r.Rules = role.Rules
  326. }); err != nil {
  327. return r.notReadyErrf(pg, logger, "error provisioning Role: %w", err)
  328. }
  329. roleBinding := pgRoleBinding(pg, r.tsNamespace)
  330. if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, roleBinding, func(r *rbacv1.RoleBinding) {
  331. r.ObjectMeta.Labels = roleBinding.ObjectMeta.Labels
  332. r.ObjectMeta.Annotations = roleBinding.ObjectMeta.Annotations
  333. r.ObjectMeta.OwnerReferences = roleBinding.ObjectMeta.OwnerReferences
  334. r.RoleRef = roleBinding.RoleRef
  335. r.Subjects = roleBinding.Subjects
  336. }); err != nil {
  337. return r.notReadyErrf(pg, logger, "error provisioning RoleBinding: %w", err)
  338. }
  339. if pg.Spec.Type == tsapi.ProxyGroupTypeEgress {
  340. cm, hp := pgEgressCM(pg, r.tsNamespace)
  341. if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, cm, func(existing *corev1.ConfigMap) {
  342. existing.ObjectMeta.Labels = cm.ObjectMeta.Labels
  343. existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences
  344. mak.Set(&existing.BinaryData, egressservices.KeyHEPPings, hp)
  345. }); err != nil {
  346. return r.notReadyErrf(pg, logger, "error provisioning egress ConfigMap %q: %w", cm.Name, err)
  347. }
  348. }
  349. if pg.Spec.Type == tsapi.ProxyGroupTypeIngress {
  350. cm := pgIngressCM(pg, r.tsNamespace)
  351. if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, cm, func(existing *corev1.ConfigMap) {
  352. existing.ObjectMeta.Labels = cm.ObjectMeta.Labels
  353. existing.ObjectMeta.OwnerReferences = cm.ObjectMeta.OwnerReferences
  354. }); err != nil {
  355. return r.notReadyErrf(pg, logger, "error provisioning ingress ConfigMap %q: %w", cm.Name, err)
  356. }
  357. }
  358. defaultImage := r.tsProxyImage
  359. if pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer {
  360. defaultImage = r.k8sProxyImage
  361. }
  362. ss, err := pgStatefulSet(pg, r.tsNamespace, defaultImage, r.tsFirewallMode, tailscaledPort, proxyClass)
  363. if err != nil {
  364. return r.notReadyErrf(pg, logger, "error generating StatefulSet spec: %w", err)
  365. }
  366. cfg := &tailscaleSTSConfig{
  367. proxyType: string(pg.Spec.Type),
  368. }
  369. ss = applyProxyClassToStatefulSet(proxyClass, ss, cfg, logger)
  370. if _, err := createOrUpdate(ctx, r.Client, r.tsNamespace, ss, func(s *appsv1.StatefulSet) {
  371. s.Spec = ss.Spec
  372. s.ObjectMeta.Labels = ss.ObjectMeta.Labels
  373. s.ObjectMeta.Annotations = ss.ObjectMeta.Annotations
  374. s.ObjectMeta.OwnerReferences = ss.ObjectMeta.OwnerReferences
  375. }); err != nil {
  376. return r.notReadyErrf(pg, logger, "error provisioning StatefulSet: %w", err)
  377. }
  378. mo := &metricsOpts{
  379. tsNamespace: r.tsNamespace,
  380. proxyStsName: pg.Name,
  381. proxyLabels: pgLabels(pg.Name, nil),
  382. proxyType: "proxygroup",
  383. }
  384. if err := reconcileMetricsResources(ctx, logger, mo, proxyClass, r.Client); err != nil {
  385. return r.notReadyErrf(pg, logger, "error reconciling metrics resources: %w", err)
  386. }
  387. if err := r.cleanupDanglingResources(ctx, tailscaleClient, pg, proxyClass); err != nil {
  388. return r.notReadyErrf(pg, logger, "error cleaning up dangling resources: %w", err)
  389. }
  390. logger.Info("ProxyGroup resources synced")
  391. return staticEndpoints, nil, nil
  392. }
  393. func (r *ProxyGroupReconciler) maybeUpdateStatus(ctx context.Context, logger *zap.SugaredLogger, pg *tsapi.ProxyGroup, oldPGStatus *tsapi.ProxyGroupStatus, nrr *notReadyReason, endpoints map[string][]netip.AddrPort) (err error) {
  394. defer func() {
  395. if !apiequality.Semantic.DeepEqual(*oldPGStatus, pg.Status) {
  396. if updateErr := r.Client.Status().Update(ctx, pg); updateErr != nil {
  397. if strings.Contains(updateErr.Error(), optimisticLockErrorMsg) {
  398. logger.Infof("optimistic lock error updating status, retrying: %s", updateErr)
  399. updateErr = nil
  400. }
  401. err = errors.Join(err, updateErr)
  402. }
  403. }
  404. }()
  405. devices, err := r.getRunningProxies(ctx, pg, endpoints)
  406. if err != nil {
  407. return fmt.Errorf("failed to list running proxies: %w", err)
  408. }
  409. pg.Status.Devices = devices
  410. desiredReplicas := int(pgReplicas(pg))
  411. // Set ProxyGroupAvailable condition.
  412. status := metav1.ConditionFalse
  413. reason := reasonProxyGroupCreating
  414. message := fmt.Sprintf("%d/%d ProxyGroup pods running", len(devices), desiredReplicas)
  415. if len(devices) > 0 {
  416. status = metav1.ConditionTrue
  417. if len(devices) == desiredReplicas {
  418. reason = reasonProxyGroupAvailable
  419. }
  420. }
  421. tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupAvailable, status, reason, message, 0, r.clock, logger)
  422. // Set ProxyGroupReady condition.
  423. tsSvcValid, tsSvcSet := tsoperator.KubeAPIServerProxyValid(pg)
  424. status = metav1.ConditionFalse
  425. reason = reasonProxyGroupCreating
  426. switch {
  427. case nrr != nil:
  428. // If we failed earlier, that reason takes precedence.
  429. reason = nrr.reason
  430. message = nrr.message
  431. case pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer && tsSvcSet && !tsSvcValid:
  432. reason = reasonProxyGroupInvalid
  433. message = "waiting for config in spec.kubeAPIServer to be marked valid"
  434. case len(devices) < desiredReplicas:
  435. case len(devices) > desiredReplicas:
  436. message = fmt.Sprintf("waiting for %d ProxyGroup pods to shut down", len(devices)-desiredReplicas)
  437. case pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer && !tsoperator.KubeAPIServerProxyConfigured(pg):
  438. reason = reasonProxyGroupCreating
  439. message = "waiting for proxies to start advertising the kube-apiserver proxy's hostname"
  440. default:
  441. status = metav1.ConditionTrue
  442. reason = reasonProxyGroupReady
  443. message = reasonProxyGroupReady
  444. }
  445. tsoperator.SetProxyGroupCondition(pg, tsapi.ProxyGroupReady, status, reason, message, pg.Generation, r.clock, logger)
  446. return nil
  447. }
  448. // getServicePortsForProxyGroups returns a map of ProxyGroup Service names to their NodePorts,
  449. // and a set of all allocated NodePorts for quick occupancy checking.
  450. func getServicePortsForProxyGroups(ctx context.Context, c client.Client, namespace string, portRanges tsapi.PortRanges) (map[string]uint16, set.Set[uint16], error) {
  451. svcs := new(corev1.ServiceList)
  452. matchingLabels := client.MatchingLabels(map[string]string{
  453. LabelParentType: "proxygroup",
  454. })
  455. err := c.List(ctx, svcs, matchingLabels, client.InNamespace(namespace))
  456. if err != nil {
  457. return nil, nil, fmt.Errorf("failed to list ProxyGroup Services: %w", err)
  458. }
  459. svcToNodePorts := map[string]uint16{}
  460. usedPorts := set.Set[uint16]{}
  461. for _, svc := range svcs.Items {
  462. if len(svc.Spec.Ports) == 1 && svc.Spec.Ports[0].NodePort != 0 {
  463. p := uint16(svc.Spec.Ports[0].NodePort)
  464. if portRanges.Contains(p) {
  465. svcToNodePorts[svc.Name] = p
  466. usedPorts.Add(p)
  467. }
  468. }
  469. }
  470. return svcToNodePorts, usedPorts, nil
  471. }
  472. type allocatePortsErr struct {
  473. msg string
  474. }
  475. func (e *allocatePortsErr) Error() string {
  476. return e.msg
  477. }
  478. func (r *ProxyGroupReconciler) allocatePorts(ctx context.Context, pg *tsapi.ProxyGroup, proxyClassName string, portRanges tsapi.PortRanges) (map[string]uint16, error) {
  479. replicaCount := int(pgReplicas(pg))
  480. svcToNodePorts, usedPorts, err := getServicePortsForProxyGroups(ctx, r.Client, r.tsNamespace, portRanges)
  481. if err != nil {
  482. return nil, &allocatePortsErr{msg: fmt.Sprintf("failed to find ports for existing ProxyGroup NodePort Services: %s", err.Error())}
  483. }
  484. replicasAllocated := 0
  485. for i := range pgReplicas(pg) {
  486. if _, ok := svcToNodePorts[pgNodePortServiceName(pg.Name, i)]; !ok {
  487. svcToNodePorts[pgNodePortServiceName(pg.Name, i)] = 0
  488. } else {
  489. replicasAllocated++
  490. }
  491. }
  492. for replica, port := range svcToNodePorts {
  493. if port == 0 {
  494. for p := range portRanges.All() {
  495. if !usedPorts.Contains(p) {
  496. svcToNodePorts[replica] = p
  497. usedPorts.Add(p)
  498. replicasAllocated++
  499. break
  500. }
  501. }
  502. }
  503. }
  504. if replicasAllocated < replicaCount {
  505. return nil, &allocatePortsErr{msg: fmt.Sprintf("not enough available ports to allocate all replicas (needed %d, got %d). Field 'spec.staticEndpoints.nodePort.ports' on ProxyClass %q must have bigger range allocated", replicaCount, usedPorts.Len(), proxyClassName)}
  506. }
  507. return svcToNodePorts, nil
  508. }
  509. func (r *ProxyGroupReconciler) ensureNodePortServiceCreated(ctx context.Context, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) (map[string]uint16, *uint16, error) {
  510. // NOTE: (ChaosInTheCRD) we want the same TargetPort for every static endpoint NodePort Service for the ProxyGroup
  511. tailscaledPort := getRandomPort()
  512. svcs := []*corev1.Service{}
  513. for i := range pgReplicas(pg) {
  514. nodePortSvcName := pgNodePortServiceName(pg.Name, i)
  515. svc := &corev1.Service{}
  516. err := r.Get(ctx, types.NamespacedName{Name: nodePortSvcName, Namespace: r.tsNamespace}, svc)
  517. if err != nil && !apierrors.IsNotFound(err) {
  518. return nil, nil, fmt.Errorf("error getting Kubernetes Service %q: %w", nodePortSvcName, err)
  519. }
  520. if apierrors.IsNotFound(err) {
  521. svcs = append(svcs, pgNodePortService(pg, nodePortSvcName, r.tsNamespace))
  522. } else {
  523. // NOTE: if we can we want to recover the random port used for tailscaled,
  524. // as well as the NodePort previously used for that Service
  525. if len(svc.Spec.Ports) == 1 {
  526. if svc.Spec.Ports[0].Port != 0 {
  527. tailscaledPort = uint16(svc.Spec.Ports[0].Port)
  528. }
  529. }
  530. svcs = append(svcs, svc)
  531. }
  532. }
  533. svcToNodePorts, err := r.allocatePorts(ctx, pg, pc.Name, pc.Spec.StaticEndpoints.NodePort.Ports)
  534. if err != nil {
  535. return nil, nil, fmt.Errorf("failed to allocate NodePorts to ProxyGroup Services: %w", err)
  536. }
  537. for _, svc := range svcs {
  538. // NOTE: we know that every service is going to have 1 port here
  539. svc.Spec.Ports[0].Port = int32(tailscaledPort)
  540. svc.Spec.Ports[0].TargetPort = intstr.FromInt(int(tailscaledPort))
  541. svc.Spec.Ports[0].NodePort = int32(svcToNodePorts[svc.Name])
  542. _, err = createOrUpdate(ctx, r.Client, r.tsNamespace, svc, func(s *corev1.Service) {
  543. s.ObjectMeta.Labels = svc.ObjectMeta.Labels
  544. s.ObjectMeta.Annotations = svc.ObjectMeta.Annotations
  545. s.ObjectMeta.OwnerReferences = svc.ObjectMeta.OwnerReferences
  546. s.Spec.Selector = svc.Spec.Selector
  547. s.Spec.Ports = svc.Spec.Ports
  548. })
  549. if err != nil {
  550. return nil, nil, fmt.Errorf("error creating/updating Kubernetes NodePort Service %q: %w", svc.Name, err)
  551. }
  552. }
  553. return svcToNodePorts, ptr.To(tailscaledPort), nil
  554. }
  555. // cleanupDanglingResources ensures we don't leak config secrets, state secrets, and
  556. // tailnet devices when the number of replicas specified is reduced.
  557. func (r *ProxyGroupReconciler) cleanupDanglingResources(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass) error {
  558. logger := r.logger(pg.Name)
  559. metadata, err := r.getNodeMetadata(ctx, pg)
  560. if err != nil {
  561. return err
  562. }
  563. for _, m := range metadata {
  564. if m.ordinal+1 <= int(pgReplicas(pg)) {
  565. continue
  566. }
  567. // Dangling resource, delete the config + state Secrets, as well as
  568. // deleting the device from the tailnet.
  569. if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil {
  570. return err
  571. }
  572. if err := r.Delete(ctx, m.stateSecret); err != nil && !apierrors.IsNotFound(err) {
  573. return fmt.Errorf("error deleting state Secret %q: %w", m.stateSecret.Name, err)
  574. }
  575. configSecret := m.stateSecret.DeepCopy()
  576. configSecret.Name += "-config"
  577. if err := r.Delete(ctx, configSecret); err != nil && !apierrors.IsNotFound(err) {
  578. return fmt.Errorf("error deleting config Secret %q: %w", configSecret.Name, err)
  579. }
  580. // NOTE(ChaosInTheCRD): we shouldn't need to get the service first, checking for a not found error should be enough
  581. svc := &corev1.Service{
  582. ObjectMeta: metav1.ObjectMeta{
  583. Name: fmt.Sprintf("%s-nodeport", m.stateSecret.Name),
  584. Namespace: m.stateSecret.Namespace,
  585. },
  586. }
  587. if err := r.Delete(ctx, svc); err != nil {
  588. if !apierrors.IsNotFound(err) {
  589. return fmt.Errorf("error deleting static endpoints Kubernetes Service %q: %w", svc.Name, err)
  590. }
  591. }
  592. }
  593. // If the ProxyClass has its StaticEndpoints config removed, we want to remove all of the NodePort Services
  594. if pc != nil && pc.Spec.StaticEndpoints == nil {
  595. labels := map[string]string{
  596. kubetypes.LabelManaged: "true",
  597. LabelParentType: proxyTypeProxyGroup,
  598. LabelParentName: pg.Name,
  599. }
  600. if err := r.DeleteAllOf(ctx, &corev1.Service{}, client.InNamespace(r.tsNamespace), client.MatchingLabels(labels)); err != nil {
  601. return fmt.Errorf("error deleting Kubernetes Services for static endpoints: %w", err)
  602. }
  603. }
  604. return nil
  605. }
  606. // maybeCleanup just deletes the device from the tailnet. All the kubernetes
  607. // resources linked to a ProxyGroup will get cleaned up via owner references
  608. // (which we can use because they are all in the same namespace).
  609. func (r *ProxyGroupReconciler) maybeCleanup(ctx context.Context, tailscaleClient tsClient, pg *tsapi.ProxyGroup) (bool, error) {
  610. logger := r.logger(pg.Name)
  611. metadata, err := r.getNodeMetadata(ctx, pg)
  612. if err != nil {
  613. return false, err
  614. }
  615. for _, m := range metadata {
  616. if err := r.deleteTailnetDevice(ctx, tailscaleClient, m.tsID, logger); err != nil {
  617. return false, err
  618. }
  619. }
  620. mo := &metricsOpts{
  621. proxyLabels: pgLabels(pg.Name, nil),
  622. tsNamespace: r.tsNamespace,
  623. proxyType: "proxygroup",
  624. }
  625. if err := maybeCleanupMetricsResources(ctx, mo, r.Client); err != nil {
  626. return false, fmt.Errorf("error cleaning up metrics resources: %w", err)
  627. }
  628. logger.Infof("cleaned up ProxyGroup resources")
  629. r.mu.Lock()
  630. r.ensureRemovedFromGaugeForProxyGroup(pg)
  631. r.mu.Unlock()
  632. return true, nil
  633. }
  634. func (r *ProxyGroupReconciler) deleteTailnetDevice(ctx context.Context, tailscaleClient tsClient, id tailcfg.StableNodeID, logger *zap.SugaredLogger) error {
  635. logger.Debugf("deleting device %s from control", string(id))
  636. if err := tailscaleClient.DeleteDevice(ctx, string(id)); err != nil {
  637. errResp := &tailscale.ErrResponse{}
  638. if ok := errors.As(err, errResp); ok && errResp.Status == http.StatusNotFound {
  639. logger.Debugf("device %s not found, likely because it has already been deleted from control", string(id))
  640. } else {
  641. return fmt.Errorf("error deleting device: %w", err)
  642. }
  643. } else {
  644. logger.Debugf("device %s deleted from control", string(id))
  645. }
  646. return nil
  647. }
  648. func (r *ProxyGroupReconciler) ensureConfigSecretsCreated(
  649. ctx context.Context,
  650. tailscaleClient tsClient,
  651. pg *tsapi.ProxyGroup,
  652. proxyClass *tsapi.ProxyClass,
  653. svcToNodePorts map[string]uint16,
  654. ) (endpoints map[string][]netip.AddrPort, err error) {
  655. logger := r.logger(pg.Name)
  656. endpoints = make(map[string][]netip.AddrPort, pgReplicas(pg)) // keyed by Service name.
  657. for i := range pgReplicas(pg) {
  658. cfgSecret := &corev1.Secret{
  659. ObjectMeta: metav1.ObjectMeta{
  660. Name: pgConfigSecretName(pg.Name, i),
  661. Namespace: r.tsNamespace,
  662. Labels: pgSecretLabels(pg.Name, kubetypes.LabelSecretTypeConfig),
  663. OwnerReferences: pgOwnerReference(pg),
  664. },
  665. }
  666. var existingCfgSecret *corev1.Secret // unmodified copy of secret
  667. if err = r.Get(ctx, client.ObjectKeyFromObject(cfgSecret), cfgSecret); err == nil {
  668. logger.Debugf("Secret %s/%s already exists", cfgSecret.GetNamespace(), cfgSecret.GetName())
  669. existingCfgSecret = cfgSecret.DeepCopy()
  670. } else if !apierrors.IsNotFound(err) {
  671. return nil, err
  672. }
  673. var authKey *string
  674. if existingCfgSecret == nil {
  675. logger.Debugf("Creating authkey for new ProxyGroup proxy")
  676. tags := pg.Spec.Tags.Stringify()
  677. if len(tags) == 0 {
  678. tags = r.defaultTags
  679. }
  680. key, err := newAuthKey(ctx, tailscaleClient, tags)
  681. if err != nil {
  682. return nil, err
  683. }
  684. authKey = &key
  685. }
  686. if authKey == nil {
  687. // Get state Secret to check if it's already authed.
  688. stateSecret := &corev1.Secret{
  689. ObjectMeta: metav1.ObjectMeta{
  690. Name: pgStateSecretName(pg.Name, i),
  691. Namespace: r.tsNamespace,
  692. },
  693. }
  694. if err = r.Get(ctx, client.ObjectKeyFromObject(stateSecret), stateSecret); err != nil && !apierrors.IsNotFound(err) {
  695. return nil, err
  696. }
  697. if shouldRetainAuthKey(stateSecret) && existingCfgSecret != nil {
  698. authKey, err = authKeyFromSecret(existingCfgSecret)
  699. if err != nil {
  700. return nil, fmt.Errorf("error retrieving auth key from existing config Secret: %w", err)
  701. }
  702. }
  703. }
  704. nodePortSvcName := pgNodePortServiceName(pg.Name, i)
  705. if len(svcToNodePorts) > 0 {
  706. replicaName := fmt.Sprintf("%s-%d", pg.Name, i)
  707. port, ok := svcToNodePorts[nodePortSvcName]
  708. if !ok {
  709. return nil, fmt.Errorf("could not find configured NodePort for ProxyGroup replica %q", replicaName)
  710. }
  711. endpoints[nodePortSvcName], err = r.findStaticEndpoints(ctx, existingCfgSecret, proxyClass, port, logger)
  712. if err != nil {
  713. return nil, fmt.Errorf("could not find static endpoints for replica %q: %w", replicaName, err)
  714. }
  715. }
  716. if pg.Spec.Type == tsapi.ProxyGroupTypeKubernetesAPIServer {
  717. hostname := pgHostname(pg, i)
  718. if authKey == nil && existingCfgSecret != nil {
  719. deviceAuthed := false
  720. for _, d := range pg.Status.Devices {
  721. if d.Hostname == hostname {
  722. deviceAuthed = true
  723. break
  724. }
  725. }
  726. if !deviceAuthed {
  727. existingCfg := conf.ConfigV1Alpha1{}
  728. if err := json.Unmarshal(existingCfgSecret.Data[kubetypes.KubeAPIServerConfigFile], &existingCfg); err != nil {
  729. return nil, fmt.Errorf("error unmarshalling existing config: %w", err)
  730. }
  731. if existingCfg.AuthKey != nil {
  732. authKey = existingCfg.AuthKey
  733. }
  734. }
  735. }
  736. mode := kubetypes.APIServerProxyModeAuth
  737. if !isAuthAPIServerProxy(pg) {
  738. mode = kubetypes.APIServerProxyModeNoAuth
  739. }
  740. cfg := conf.VersionedConfig{
  741. Version: "v1alpha1",
  742. ConfigV1Alpha1: &conf.ConfigV1Alpha1{
  743. AuthKey: authKey,
  744. State: ptr.To(fmt.Sprintf("kube:%s", pgPodName(pg.Name, i))),
  745. App: ptr.To(kubetypes.AppProxyGroupKubeAPIServer),
  746. LogLevel: ptr.To(logger.Level().String()),
  747. // Reloadable fields.
  748. Hostname: &hostname,
  749. APIServerProxy: &conf.APIServerProxyConfig{
  750. Enabled: opt.NewBool(true),
  751. Mode: &mode,
  752. // The first replica is elected as the cert issuer, same
  753. // as containerboot does for ingress-pg-reconciler.
  754. IssueCerts: opt.NewBool(i == 0),
  755. },
  756. LocalPort: ptr.To(uint16(9002)),
  757. HealthCheckEnabled: opt.NewBool(true),
  758. },
  759. }
  760. // Copy over config that the apiserver-proxy-service-reconciler sets.
  761. if existingCfgSecret != nil {
  762. if k8sProxyCfg, ok := cfgSecret.Data[kubetypes.KubeAPIServerConfigFile]; ok {
  763. k8sCfg := &conf.ConfigV1Alpha1{}
  764. if err := json.Unmarshal(k8sProxyCfg, k8sCfg); err != nil {
  765. return nil, fmt.Errorf("failed to unmarshal kube-apiserver config: %w", err)
  766. }
  767. cfg.AdvertiseServices = k8sCfg.AdvertiseServices
  768. if k8sCfg.APIServerProxy != nil {
  769. cfg.APIServerProxy.ServiceName = k8sCfg.APIServerProxy.ServiceName
  770. }
  771. }
  772. }
  773. if r.loginServer != "" {
  774. cfg.ServerURL = &r.loginServer
  775. }
  776. if proxyClass != nil && proxyClass.Spec.TailscaleConfig != nil {
  777. cfg.AcceptRoutes = opt.NewBool(proxyClass.Spec.TailscaleConfig.AcceptRoutes)
  778. }
  779. if proxyClass != nil && proxyClass.Spec.Metrics != nil {
  780. cfg.MetricsEnabled = opt.NewBool(proxyClass.Spec.Metrics.Enable)
  781. }
  782. if len(endpoints[nodePortSvcName]) > 0 {
  783. cfg.StaticEndpoints = endpoints[nodePortSvcName]
  784. }
  785. cfgB, err := json.Marshal(cfg)
  786. if err != nil {
  787. return nil, fmt.Errorf("error marshalling k8s-proxy config: %w", err)
  788. }
  789. mak.Set(&cfgSecret.Data, kubetypes.KubeAPIServerConfigFile, cfgB)
  790. } else {
  791. // AdvertiseServices config is set by ingress-pg-reconciler, so make sure we
  792. // don't overwrite it if already set.
  793. existingAdvertiseServices, err := extractAdvertiseServicesConfig(existingCfgSecret)
  794. if err != nil {
  795. return nil, err
  796. }
  797. configs, err := pgTailscaledConfig(pg, proxyClass, i, authKey, endpoints[nodePortSvcName], existingAdvertiseServices, r.loginServer)
  798. if err != nil {
  799. return nil, fmt.Errorf("error creating tailscaled config: %w", err)
  800. }
  801. for cap, cfg := range configs {
  802. cfgJSON, err := json.Marshal(cfg)
  803. if err != nil {
  804. return nil, fmt.Errorf("error marshalling tailscaled config: %w", err)
  805. }
  806. mak.Set(&cfgSecret.Data, tsoperator.TailscaledConfigFileName(cap), cfgJSON)
  807. }
  808. }
  809. if existingCfgSecret != nil {
  810. if !apiequality.Semantic.DeepEqual(existingCfgSecret, cfgSecret) {
  811. logger.Debugf("Updating the existing ProxyGroup config Secret %s", cfgSecret.Name)
  812. if err := r.Update(ctx, cfgSecret); err != nil {
  813. return nil, err
  814. }
  815. }
  816. } else {
  817. logger.Debugf("Creating a new config Secret %s for the ProxyGroup", cfgSecret.Name)
  818. if err := r.Create(ctx, cfgSecret); err != nil {
  819. return nil, err
  820. }
  821. }
  822. }
  823. return endpoints, nil
  824. }
  825. type FindStaticEndpointErr struct {
  826. msg string
  827. }
  828. func (e *FindStaticEndpointErr) Error() string {
  829. return e.msg
  830. }
  831. // findStaticEndpoints returns up to two `netip.AddrPort` entries, derived from the ExternalIPs of Nodes that
  832. // match the `proxyClass`'s selector within the StaticEndpoints configuration. The port is set to the replica's NodePort Service Port.
  833. func (r *ProxyGroupReconciler) findStaticEndpoints(ctx context.Context, existingCfgSecret *corev1.Secret, proxyClass *tsapi.ProxyClass, port uint16, logger *zap.SugaredLogger) ([]netip.AddrPort, error) {
  834. var currAddrs []netip.AddrPort
  835. if existingCfgSecret != nil {
  836. oldConfB := existingCfgSecret.Data[tsoperator.TailscaledConfigFileName(106)]
  837. if len(oldConfB) > 0 {
  838. var oldConf ipn.ConfigVAlpha
  839. if err := json.Unmarshal(oldConfB, &oldConf); err == nil {
  840. currAddrs = oldConf.StaticEndpoints
  841. } else {
  842. logger.Debugf("failed to unmarshal tailscaled config from secret %q: %v", existingCfgSecret.Name, err)
  843. }
  844. } else {
  845. logger.Debugf("failed to get tailscaled config from secret %q: empty data", existingCfgSecret.Name)
  846. }
  847. }
  848. nodes := new(corev1.NodeList)
  849. selectors := client.MatchingLabels(proxyClass.Spec.StaticEndpoints.NodePort.Selector)
  850. err := r.List(ctx, nodes, selectors)
  851. if err != nil {
  852. return nil, fmt.Errorf("failed to list nodes: %w", err)
  853. }
  854. if len(nodes.Items) == 0 {
  855. return nil, &FindStaticEndpointErr{msg: fmt.Sprintf("failed to match nodes to configured Selectors on `spec.staticEndpoints.nodePort.selectors` field for ProxyClass %q", proxyClass.Name)}
  856. }
  857. endpoints := []netip.AddrPort{}
  858. // NOTE(ChaosInTheCRD): Setting a hard limit of two static endpoints.
  859. newAddrs := []netip.AddrPort{}
  860. for _, n := range nodes.Items {
  861. for _, a := range n.Status.Addresses {
  862. if a.Type == corev1.NodeExternalIP {
  863. addr := getStaticEndpointAddress(&a, port)
  864. if addr == nil {
  865. logger.Debugf("failed to parse %q address on node %q: %q", corev1.NodeExternalIP, n.Name, a.Address)
  866. continue
  867. }
  868. // we want to add the currently used IPs first before
  869. // adding new ones.
  870. if currAddrs != nil && slices.Contains(currAddrs, *addr) {
  871. endpoints = append(endpoints, *addr)
  872. } else {
  873. newAddrs = append(newAddrs, *addr)
  874. }
  875. }
  876. if len(endpoints) == 2 {
  877. break
  878. }
  879. }
  880. }
  881. // if the 2 endpoints limit hasn't been reached, we
  882. // can start adding newIPs.
  883. if len(endpoints) < 2 {
  884. for _, a := range newAddrs {
  885. endpoints = append(endpoints, a)
  886. if len(endpoints) == 2 {
  887. break
  888. }
  889. }
  890. }
  891. if len(endpoints) == 0 {
  892. return nil, &FindStaticEndpointErr{msg: fmt.Sprintf("failed to find any `status.addresses` of type %q on nodes using configured Selectors on `spec.staticEndpoints.nodePort.selectors` for ProxyClass %q", corev1.NodeExternalIP, proxyClass.Name)}
  893. }
  894. return endpoints, nil
  895. }
  896. func getStaticEndpointAddress(a *corev1.NodeAddress, port uint16) *netip.AddrPort {
  897. addr, err := netip.ParseAddr(a.Address)
  898. if err != nil {
  899. return nil
  900. }
  901. return ptr.To(netip.AddrPortFrom(addr, port))
  902. }
  903. // ensureAddedToGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource is updated when the ProxyGroup
  904. // is created. r.mu must be held.
  905. func (r *ProxyGroupReconciler) ensureAddedToGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
  906. switch pg.Spec.Type {
  907. case tsapi.ProxyGroupTypeEgress:
  908. r.egressProxyGroups.Add(pg.UID)
  909. case tsapi.ProxyGroupTypeIngress:
  910. r.ingressProxyGroups.Add(pg.UID)
  911. case tsapi.ProxyGroupTypeKubernetesAPIServer:
  912. r.apiServerProxyGroups.Add(pg.UID)
  913. }
  914. gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
  915. gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
  916. gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
  917. }
  918. // ensureRemovedFromGaugeForProxyGroup ensures the gauge metric for the ProxyGroup resource type is updated when the
  919. // ProxyGroup is deleted. r.mu must be held.
  920. func (r *ProxyGroupReconciler) ensureRemovedFromGaugeForProxyGroup(pg *tsapi.ProxyGroup) {
  921. switch pg.Spec.Type {
  922. case tsapi.ProxyGroupTypeEgress:
  923. r.egressProxyGroups.Remove(pg.UID)
  924. case tsapi.ProxyGroupTypeIngress:
  925. r.ingressProxyGroups.Remove(pg.UID)
  926. case tsapi.ProxyGroupTypeKubernetesAPIServer:
  927. r.apiServerProxyGroups.Remove(pg.UID)
  928. }
  929. gaugeEgressProxyGroupResources.Set(int64(r.egressProxyGroups.Len()))
  930. gaugeIngressProxyGroupResources.Set(int64(r.ingressProxyGroups.Len()))
  931. gaugeAPIServerProxyGroupResources.Set(int64(r.apiServerProxyGroups.Len()))
  932. }
  933. func pgTailscaledConfig(pg *tsapi.ProxyGroup, pc *tsapi.ProxyClass, idx int32, authKey *string, staticEndpoints []netip.AddrPort, oldAdvertiseServices []string, loginServer string) (tailscaledConfigs, error) {
  934. conf := &ipn.ConfigVAlpha{
  935. Version: "alpha0",
  936. AcceptDNS: "false",
  937. AcceptRoutes: "false", // AcceptRoutes defaults to true
  938. Locked: "false",
  939. Hostname: ptr.To(pgHostname(pg, idx)),
  940. AdvertiseServices: oldAdvertiseServices,
  941. AuthKey: authKey,
  942. }
  943. if loginServer != "" {
  944. conf.ServerURL = &loginServer
  945. }
  946. if shouldAcceptRoutes(pc) {
  947. conf.AcceptRoutes = "true"
  948. }
  949. if len(staticEndpoints) > 0 {
  950. conf.StaticEndpoints = staticEndpoints
  951. }
  952. return map[tailcfg.CapabilityVersion]ipn.ConfigVAlpha{
  953. pgMinCapabilityVersion: *conf,
  954. }, nil
  955. }
  956. func extractAdvertiseServicesConfig(cfgSecret *corev1.Secret) ([]string, error) {
  957. if cfgSecret == nil {
  958. return nil, nil
  959. }
  960. cfg, err := latestConfigFromSecret(cfgSecret)
  961. if err != nil {
  962. return nil, err
  963. }
  964. if cfg == nil {
  965. return nil, nil
  966. }
  967. return cfg.AdvertiseServices, nil
  968. }
  969. // getNodeMetadata gets metadata for all the pods owned by this ProxyGroup by
  970. // querying their state Secrets. It may not return the same number of items as
  971. // specified in the ProxyGroup spec if e.g. it is getting scaled up or down, or
  972. // some pods have failed to write state.
  973. //
  974. // The returned metadata will contain an entry for each state Secret that exists.
  975. func (r *ProxyGroupReconciler) getNodeMetadata(ctx context.Context, pg *tsapi.ProxyGroup) (metadata []nodeMetadata, _ error) {
  976. // List all state Secrets owned by this ProxyGroup.
  977. secrets := &corev1.SecretList{}
  978. if err := r.List(ctx, secrets, client.InNamespace(r.tsNamespace), client.MatchingLabels(pgSecretLabels(pg.Name, kubetypes.LabelSecretTypeState))); err != nil {
  979. return nil, fmt.Errorf("failed to list state Secrets: %w", err)
  980. }
  981. for _, secret := range secrets.Items {
  982. var ordinal int
  983. if _, err := fmt.Sscanf(secret.Name, pg.Name+"-%d", &ordinal); err != nil {
  984. return nil, fmt.Errorf("unexpected secret %s was labelled as owned by the ProxyGroup %s: %w", secret.Name, pg.Name, err)
  985. }
  986. nm := nodeMetadata{
  987. ordinal: ordinal,
  988. stateSecret: &secret,
  989. }
  990. prefs, ok, err := getDevicePrefs(&secret)
  991. if err != nil {
  992. return nil, err
  993. }
  994. if ok {
  995. nm.tsID = prefs.Config.NodeID
  996. nm.dnsName = prefs.Config.UserProfile.LoginName
  997. }
  998. pod := &corev1.Pod{}
  999. if err := r.Get(ctx, client.ObjectKey{Namespace: r.tsNamespace, Name: fmt.Sprintf("%s-%d", pg.Name, ordinal)}, pod); err != nil && !apierrors.IsNotFound(err) {
  1000. return nil, err
  1001. } else if err == nil {
  1002. nm.podUID = string(pod.UID)
  1003. }
  1004. metadata = append(metadata, nm)
  1005. }
  1006. // Sort for predictable ordering and status.
  1007. sort.Slice(metadata, func(i, j int) bool {
  1008. return metadata[i].ordinal < metadata[j].ordinal
  1009. })
  1010. return metadata, nil
  1011. }
  1012. // getRunningProxies will return status for all proxy Pods whose state Secret
  1013. // has an up to date Pod UID and at least a hostname.
  1014. func (r *ProxyGroupReconciler) getRunningProxies(ctx context.Context, pg *tsapi.ProxyGroup, staticEndpoints map[string][]netip.AddrPort) (devices []tsapi.TailnetDevice, _ error) {
  1015. metadata, err := r.getNodeMetadata(ctx, pg)
  1016. if err != nil {
  1017. return nil, err
  1018. }
  1019. for i, m := range metadata {
  1020. if m.podUID == "" || !strings.EqualFold(string(m.stateSecret.Data[kubetypes.KeyPodUID]), m.podUID) {
  1021. // Current Pod has not yet written its UID to the state Secret, data may
  1022. // be stale.
  1023. continue
  1024. }
  1025. device := tsapi.TailnetDevice{}
  1026. if hostname, _, ok := strings.Cut(string(m.stateSecret.Data[kubetypes.KeyDeviceFQDN]), "."); ok {
  1027. device.Hostname = hostname
  1028. } else {
  1029. continue
  1030. }
  1031. if ipsB := m.stateSecret.Data[kubetypes.KeyDeviceIPs]; len(ipsB) > 0 {
  1032. ips := []string{}
  1033. if err := json.Unmarshal(ipsB, &ips); err != nil {
  1034. return nil, fmt.Errorf("failed to extract device IPs from state Secret %q: %w", m.stateSecret.Name, err)
  1035. }
  1036. device.TailnetIPs = ips
  1037. }
  1038. // TODO(tomhjp): This is our input to the proxy, but we should instead
  1039. // read this back from the proxy's state in some way to more accurately
  1040. // reflect its status.
  1041. if ep, ok := staticEndpoints[pgNodePortServiceName(pg.Name, int32(i))]; ok && len(ep) > 0 {
  1042. eps := make([]string, 0, len(ep))
  1043. for _, e := range ep {
  1044. eps = append(eps, e.String())
  1045. }
  1046. device.StaticEndpoints = eps
  1047. }
  1048. devices = append(devices, device)
  1049. }
  1050. return devices, nil
  1051. }
  1052. type nodeMetadata struct {
  1053. ordinal int
  1054. stateSecret *corev1.Secret
  1055. podUID string // or empty if the Pod no longer exists.
  1056. tsID tailcfg.StableNodeID
  1057. dnsName string
  1058. }
  1059. func notReady(reason, msg string) (map[string][]netip.AddrPort, *notReadyReason, error) {
  1060. return nil, &notReadyReason{
  1061. reason: reason,
  1062. message: msg,
  1063. }, nil
  1064. }
  1065. func (r *ProxyGroupReconciler) notReadyErrf(pg *tsapi.ProxyGroup, logger *zap.SugaredLogger, format string, a ...any) (map[string][]netip.AddrPort, *notReadyReason, error) {
  1066. err := fmt.Errorf(format, a...)
  1067. if strings.Contains(err.Error(), optimisticLockErrorMsg) {
  1068. msg := fmt.Sprintf("optimistic lock error, retrying: %s", err.Error())
  1069. logger.Info(msg)
  1070. return notReady(reasonProxyGroupCreating, msg)
  1071. }
  1072. r.recorder.Event(pg, corev1.EventTypeWarning, reasonProxyGroupCreationFailed, err.Error())
  1073. return nil, &notReadyReason{
  1074. reason: reasonProxyGroupCreationFailed,
  1075. message: err.Error(),
  1076. }, err
  1077. }
  1078. type notReadyReason struct {
  1079. reason string
  1080. message string
  1081. }