|
|
@@ -13,7 +13,8 @@ import (
|
|
|
"sync"
|
|
|
"time"
|
|
|
|
|
|
- "github.com/pkg/errors"
|
|
|
+ "errors"
|
|
|
+
|
|
|
"go.uber.org/zap"
|
|
|
xslices "golang.org/x/exp/slices"
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
|
@@ -58,6 +59,7 @@ type ConnectorReconciler struct {
|
|
|
|
|
|
subnetRouters set.Slice[types.UID] // for subnet routers gauge
|
|
|
exitNodes set.Slice[types.UID] // for exit nodes gauge
|
|
|
+ appConnectors set.Slice[types.UID] // for app connectors gauge
|
|
|
}
|
|
|
|
|
|
var (
|
|
|
@@ -67,6 +69,8 @@ var (
|
|
|
gaugeConnectorSubnetRouterResources = clientmetric.NewGauge(kubetypes.MetricConnectorWithSubnetRouterCount)
|
|
|
// gaugeConnectorExitNodeResources tracks the number of Connectors currently managed by this operator instance that are exit nodes.
|
|
|
gaugeConnectorExitNodeResources = clientmetric.NewGauge(kubetypes.MetricConnectorWithExitNodeCount)
|
|
|
+ // gaugeConnectorAppConnectorResources tracks the number of Connectors currently managed by this operator instance that are app connectors.
|
|
|
+ gaugeConnectorAppConnectorResources = clientmetric.NewGauge(kubetypes.MetricConnectorWithAppConnectorCount)
|
|
|
)
|
|
|
|
|
|
func (a *ConnectorReconciler) Reconcile(ctx context.Context, req reconcile.Request) (res reconcile.Result, err error) {
|
|
|
@@ -108,13 +112,12 @@ func (a *ConnectorReconciler) Reconcile(ctx context.Context, req reconcile.Reque
|
|
|
oldCnStatus := cn.Status.DeepCopy()
|
|
|
setStatus := func(cn *tsapi.Connector, _ tsapi.ConditionType, status metav1.ConditionStatus, reason, message string) (reconcile.Result, error) {
|
|
|
tsoperator.SetConnectorCondition(cn, tsapi.ConnectorReady, status, reason, message, cn.Generation, a.clock, logger)
|
|
|
+ var updateErr error
|
|
|
if !apiequality.Semantic.DeepEqual(oldCnStatus, cn.Status) {
|
|
|
// An error encountered here should get returned by the Reconcile function.
|
|
|
- if updateErr := a.Client.Status().Update(ctx, cn); updateErr != nil {
|
|
|
- err = errors.Wrap(err, updateErr.Error())
|
|
|
- }
|
|
|
+ updateErr = a.Client.Status().Update(ctx, cn)
|
|
|
}
|
|
|
- return res, err
|
|
|
+ return res, errors.Join(err, updateErr)
|
|
|
}
|
|
|
|
|
|
if !slices.Contains(cn.Finalizers, FinalizerName) {
|
|
|
@@ -150,6 +153,9 @@ func (a *ConnectorReconciler) Reconcile(ctx context.Context, req reconcile.Reque
|
|
|
cn.Status.SubnetRoutes = cn.Spec.SubnetRouter.AdvertiseRoutes.Stringify()
|
|
|
return setStatus(cn, tsapi.ConnectorReady, metav1.ConditionTrue, reasonConnectorCreated, reasonConnectorCreated)
|
|
|
}
|
|
|
+ if cn.Spec.AppConnector != nil {
|
|
|
+ cn.Status.IsAppConnector = true
|
|
|
+ }
|
|
|
cn.Status.SubnetRoutes = ""
|
|
|
return setStatus(cn, tsapi.ConnectorReady, metav1.ConditionTrue, reasonConnectorCreated, reasonConnectorCreated)
|
|
|
}
|
|
|
@@ -189,23 +195,37 @@ func (a *ConnectorReconciler) maybeProvisionConnector(ctx context.Context, logge
|
|
|
sts.Connector.routes = cn.Spec.SubnetRouter.AdvertiseRoutes.Stringify()
|
|
|
}
|
|
|
|
|
|
+ if cn.Spec.AppConnector != nil {
|
|
|
+ sts.Connector.isAppConnector = true
|
|
|
+ if len(cn.Spec.AppConnector.Routes) != 0 {
|
|
|
+ sts.Connector.routes = cn.Spec.AppConnector.Routes.Stringify()
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
a.mu.Lock()
|
|
|
- if sts.Connector.isExitNode {
|
|
|
+ if cn.Spec.ExitNode {
|
|
|
a.exitNodes.Add(cn.UID)
|
|
|
} else {
|
|
|
a.exitNodes.Remove(cn.UID)
|
|
|
}
|
|
|
- if sts.Connector.routes != "" {
|
|
|
+ if cn.Spec.SubnetRouter != nil {
|
|
|
a.subnetRouters.Add(cn.GetUID())
|
|
|
} else {
|
|
|
a.subnetRouters.Remove(cn.GetUID())
|
|
|
}
|
|
|
+ if cn.Spec.AppConnector != nil {
|
|
|
+ a.appConnectors.Add(cn.GetUID())
|
|
|
+ } else {
|
|
|
+ a.appConnectors.Remove(cn.GetUID())
|
|
|
+ }
|
|
|
a.mu.Unlock()
|
|
|
gaugeConnectorSubnetRouterResources.Set(int64(a.subnetRouters.Len()))
|
|
|
gaugeConnectorExitNodeResources.Set(int64(a.exitNodes.Len()))
|
|
|
+ gaugeConnectorAppConnectorResources.Set(int64(a.appConnectors.Len()))
|
|
|
var connectors set.Slice[types.UID]
|
|
|
connectors.AddSlice(a.exitNodes.Slice())
|
|
|
connectors.AddSlice(a.subnetRouters.Slice())
|
|
|
+ connectors.AddSlice(a.appConnectors.Slice())
|
|
|
gaugeConnectorResources.Set(int64(connectors.Len()))
|
|
|
|
|
|
_, err := a.ssr.Provision(ctx, logger, sts)
|
|
|
@@ -248,12 +268,15 @@ func (a *ConnectorReconciler) maybeCleanupConnector(ctx context.Context, logger
|
|
|
a.mu.Lock()
|
|
|
a.subnetRouters.Remove(cn.UID)
|
|
|
a.exitNodes.Remove(cn.UID)
|
|
|
+ a.appConnectors.Remove(cn.UID)
|
|
|
a.mu.Unlock()
|
|
|
gaugeConnectorExitNodeResources.Set(int64(a.exitNodes.Len()))
|
|
|
gaugeConnectorSubnetRouterResources.Set(int64(a.subnetRouters.Len()))
|
|
|
+ gaugeConnectorAppConnectorResources.Set(int64(a.appConnectors.Len()))
|
|
|
var connectors set.Slice[types.UID]
|
|
|
connectors.AddSlice(a.exitNodes.Slice())
|
|
|
connectors.AddSlice(a.subnetRouters.Slice())
|
|
|
+ connectors.AddSlice(a.appConnectors.Slice())
|
|
|
gaugeConnectorResources.Set(int64(connectors.Len()))
|
|
|
return true, nil
|
|
|
}
|
|
|
@@ -262,8 +285,14 @@ func (a *ConnectorReconciler) validate(cn *tsapi.Connector) error {
|
|
|
// Connector fields are already validated at apply time with CEL validation
|
|
|
// on custom resource fields. The checks here are a backup in case the
|
|
|
// CEL validation breaks without us noticing.
|
|
|
- if !(cn.Spec.SubnetRouter != nil || cn.Spec.ExitNode) {
|
|
|
- return errors.New("invalid spec: a Connector must expose subnet routes or act as an exit node (or both)")
|
|
|
+ if cn.Spec.SubnetRouter == nil && !cn.Spec.ExitNode && cn.Spec.AppConnector == nil {
|
|
|
+ return errors.New("invalid spec: a Connector must be configured as at least one of subnet router, exit node or app connector")
|
|
|
+ }
|
|
|
+ if (cn.Spec.SubnetRouter != nil || cn.Spec.ExitNode) && cn.Spec.AppConnector != nil {
|
|
|
+ return errors.New("invalid spec: a Connector that is configured as an app connector must not be also configured as a subnet router or exit node")
|
|
|
+ }
|
|
|
+ if cn.Spec.AppConnector != nil {
|
|
|
+ return validateAppConnector(cn.Spec.AppConnector)
|
|
|
}
|
|
|
if cn.Spec.SubnetRouter == nil {
|
|
|
return nil
|
|
|
@@ -272,19 +301,27 @@ func (a *ConnectorReconciler) validate(cn *tsapi.Connector) error {
|
|
|
}
|
|
|
|
|
|
func validateSubnetRouter(sb *tsapi.SubnetRouter) error {
|
|
|
- if len(sb.AdvertiseRoutes) < 1 {
|
|
|
+ if len(sb.AdvertiseRoutes) == 0 {
|
|
|
return errors.New("invalid subnet router spec: no routes defined")
|
|
|
}
|
|
|
- var err error
|
|
|
- for _, route := range sb.AdvertiseRoutes {
|
|
|
+ return validateRoutes(sb.AdvertiseRoutes)
|
|
|
+}
|
|
|
+
|
|
|
+func validateAppConnector(ac *tsapi.AppConnector) error {
|
|
|
+ return validateRoutes(ac.Routes)
|
|
|
+}
|
|
|
+
|
|
|
+func validateRoutes(routes tsapi.Routes) error {
|
|
|
+ var errs []error
|
|
|
+ for _, route := range routes {
|
|
|
pfx, e := netip.ParsePrefix(string(route))
|
|
|
if e != nil {
|
|
|
- err = errors.Wrap(err, fmt.Sprintf("route %s is invalid: %v", route, err))
|
|
|
+ errs = append(errs, fmt.Errorf("route %v is invalid: %v", route, e))
|
|
|
continue
|
|
|
}
|
|
|
if pfx.Masked() != pfx {
|
|
|
- err = errors.Wrap(err, fmt.Sprintf("route %s has non-address bits set; expected %s", pfx, pfx.Masked()))
|
|
|
+ errs = append(errs, fmt.Errorf("route %s has non-address bits set; expected %s", pfx, pfx.Masked()))
|
|
|
}
|
|
|
}
|
|
|
- return err
|
|
|
+ return errors.Join(errs...)
|
|
|
}
|