Browse Source

cmd/natc,tsconsensus: add cluster config admin

Add the ability for operators of natc in consensus mode to remove
servers from the raft cluster config, without losing other state.

Updates #14667

Signed-off-by: Fran Bull <[email protected]>
Fran Bull 6 months ago
parent
commit
b48d2de6ab
3 changed files with 75 additions and 0 deletions
  1. 17 0
      cmd/natc/ippool/consensusippool.go
  2. 38 0
      cmd/natc/natc.go
  3. 20 0
      tsconsensus/tsconsensus.go

+ 17 - 0
cmd/natc/ippool/consensusippool.go

@@ -30,6 +30,7 @@ type ConsensusIPPool struct {
 	IPSet                 *netipx.IPSet
 	perPeerMap            *syncs.Map[tailcfg.NodeID, *consensusPerPeerState]
 	consensus             commandExecutor
+	clusterController     clusterController
 	unusedAddressLifetime time.Duration
 }
 
@@ -168,6 +169,7 @@ func (ipp *ConsensusIPPool) StartConsensus(ctx context.Context, ts *tsnet.Server
 		return err
 	}
 	ipp.consensus = cns
+	ipp.clusterController = cns
 	return nil
 }
 
@@ -442,3 +444,18 @@ func (ipp *ConsensusIPPool) Apply(l *raft.Log) any {
 type commandExecutor interface {
 	ExecuteCommand(tsconsensus.Command) (tsconsensus.CommandResult, error)
 }
+
+type clusterController interface {
+	GetClusterConfiguration() (raft.Configuration, error)
+	DeleteClusterServer(id raft.ServerID) (uint64, error)
+}
+
+// GetClusterConfiguration gets the consensus implementation's cluster configuration
+func (ipp *ConsensusIPPool) GetClusterConfiguration() (raft.Configuration, error) {
+	return ipp.clusterController.GetClusterConfiguration()
+}
+
+// DeleteClusterServer removes a server from the consensus implementation's cluster configuration
+func (ipp *ConsensusIPPool) DeleteClusterServer(id raft.ServerID) (uint64, error) {
+	return ipp.clusterController.DeleteClusterServer(id)
+}

+ 38 - 0
cmd/natc/natc.go

@@ -8,6 +8,7 @@ package main
 
 import (
 	"context"
+	"encoding/json"
 	"errors"
 	"expvar"
 	"flag"
@@ -23,6 +24,7 @@ import (
 	"time"
 
 	"github.com/gaissmai/bart"
+	"github.com/hashicorp/raft"
 	"github.com/inetaf/tcpproxy"
 	"github.com/peterbourgon/ff/v3"
 	"go4.org/netipx"
@@ -63,6 +65,7 @@ func main() {
 		server            = fs.String("login-server", ipn.DefaultControlURL, "the base URL of control server")
 		stateDir          = fs.String("state-dir", "", "path to directory in which to store app state")
 		clusterFollowOnly = fs.Bool("follow-only", false, "Try to find a leader with the cluster tag or exit.")
+		clusterAdminPort  = fs.Int("cluster-admin-port", 8081, "Port on localhost for the cluster admin HTTP API")
 	)
 	ff.Parse(fs, os.Args[1:], ff.WithEnvVarPrefix("TS_NATC"))
 
@@ -179,6 +182,12 @@ func main() {
 			}
 		}()
 		ipp = cipp
+
+		go func() {
+			// This listens on localhost only, so that only those with access to the host machine
+			// can remove servers from the cluster config.
+			log.Print(http.ListenAndServe(fmt.Sprintf("127.0.0.1:%d", *clusterAdminPort), httpClusterAdmin(cipp)))
+		}()
 	} else {
 		ipp = &ippool.SingleMachineIPPool{IPSet: addrPool}
 	}
@@ -633,3 +642,32 @@ func getClusterStatePath(stateDirFlag string) (string, error) {
 
 	return dirPath, nil
 }
+
+func httpClusterAdmin(ipp *ippool.ConsensusIPPool) http.Handler {
+	mux := http.NewServeMux()
+	mux.HandleFunc("GET /{$}", func(w http.ResponseWriter, r *http.Request) {
+		c, err := ipp.GetClusterConfiguration()
+		if err != nil {
+			log.Printf("cluster admin http: error getClusterConfig: %v", err)
+			http.Error(w, "", http.StatusInternalServerError)
+			return
+		}
+		if err := json.NewEncoder(w).Encode(c); err != nil {
+			log.Printf("cluster admin http: error encoding raft configuration: %v", err)
+		}
+	})
+	mux.HandleFunc("DELETE /{id}", func(w http.ResponseWriter, r *http.Request) {
+		idString := r.PathValue("id")
+		id := raft.ServerID(idString)
+		idx, err := ipp.DeleteClusterServer(id)
+		if err != nil {
+			http.Error(w, err.Error(), http.StatusInternalServerError)
+			return
+		}
+		if err := json.NewEncoder(w).Encode(idx); err != nil {
+			log.Printf("cluster admin http: error encoding delete index: %v", err)
+			return
+		}
+	})
+	return mux
+}

+ 20 - 0
tsconsensus/tsconsensus.go

@@ -525,3 +525,23 @@ func (c *Consensus) raftAddr(host netip.Addr) string {
 func (c *Consensus) commandAddr(host netip.Addr) string {
 	return netip.AddrPortFrom(host, c.config.CommandPort).String()
 }
+
+// GetClusterConfiguration returns the result of the underlying raft instance's GetConfiguration
+func (c *Consensus) GetClusterConfiguration() (raft.Configuration, error) {
+	fut := c.raft.GetConfiguration()
+	err := fut.Error()
+	if err != nil {
+		return raft.Configuration{}, err
+	}
+	return fut.Configuration(), nil
+}
+
+// DeleteClusterServer returns the result of the underlying raft instance's RemoveServer
+func (c *Consensus) DeleteClusterServer(id raft.ServerID) (uint64, error) {
+	fut := c.raft.RemoveServer(id, 0, 1*time.Second)
+	err := fut.Error()
+	if err != nil {
+		return 0, err
+	}
+	return fut.Index(), nil
+}