client.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316
  1. // Copyright (c) Tailscale Inc & AUTHORS
  2. // SPDX-License-Identifier: BSD-3-Clause
  3. // Package kube provides a client to interact with Kubernetes.
  4. // This package is Tailscale-internal and not meant for external consumption.
  5. // Further, the API should not be considered stable.
  6. package kube
  7. import (
  8. "bytes"
  9. "context"
  10. "crypto/tls"
  11. "crypto/x509"
  12. "encoding/json"
  13. "fmt"
  14. "io"
  15. "log"
  16. "net/http"
  17. "net/url"
  18. "os"
  19. "path/filepath"
  20. "sync"
  21. "time"
  22. "tailscale.com/util/multierr"
  23. )
  24. const (
  25. saPath = "/var/run/secrets/kubernetes.io/serviceaccount"
  26. defaultURL = "https://kubernetes.default.svc"
  27. )
  28. // rootPathForTests is set by tests to override the root path to the
  29. // service account directory.
  30. var rootPathForTests string
  31. // SetRootPathForTesting sets the path to the service account directory.
  32. func SetRootPathForTesting(p string) {
  33. rootPathForTests = p
  34. }
  35. func readFile(n string) ([]byte, error) {
  36. if rootPathForTests != "" {
  37. return os.ReadFile(filepath.Join(rootPathForTests, saPath, n))
  38. }
  39. return os.ReadFile(filepath.Join(saPath, n))
  40. }
  41. // Client handles connections to Kubernetes.
  42. // It expects to be run inside a cluster.
  43. type Client struct {
  44. mu sync.Mutex
  45. url string
  46. ns string
  47. client *http.Client
  48. token string
  49. tokenExpiry time.Time
  50. }
  51. // New returns a new client
  52. func New() (*Client, error) {
  53. ns, err := readFile("namespace")
  54. if err != nil {
  55. return nil, err
  56. }
  57. caCert, err := readFile("ca.crt")
  58. if err != nil {
  59. return nil, err
  60. }
  61. cp := x509.NewCertPool()
  62. if ok := cp.AppendCertsFromPEM(caCert); !ok {
  63. return nil, fmt.Errorf("kube: error in creating root cert pool")
  64. }
  65. return &Client{
  66. url: defaultURL,
  67. ns: string(ns),
  68. client: &http.Client{
  69. Transport: &http.Transport{
  70. TLSClientConfig: &tls.Config{
  71. RootCAs: cp,
  72. },
  73. },
  74. },
  75. }, nil
  76. }
  77. // SetURL sets the URL to use for the Kubernetes API.
  78. // This is used only for testing.
  79. func (c *Client) SetURL(url string) {
  80. c.url = url
  81. }
  82. func (c *Client) expireToken() {
  83. c.mu.Lock()
  84. defer c.mu.Unlock()
  85. c.tokenExpiry = time.Now()
  86. }
  87. func (c *Client) getOrRenewToken() (string, error) {
  88. c.mu.Lock()
  89. defer c.mu.Unlock()
  90. tk, te := c.token, c.tokenExpiry
  91. if time.Now().Before(te) {
  92. return tk, nil
  93. }
  94. tkb, err := readFile("token")
  95. if err != nil {
  96. return "", err
  97. }
  98. c.token = string(tkb)
  99. c.tokenExpiry = time.Now().Add(30 * time.Minute)
  100. return c.token, nil
  101. }
  102. func (c *Client) secretURL(name string) string {
  103. if name == "" {
  104. return fmt.Sprintf("%s/api/v1/namespaces/%s/secrets", c.url, c.ns)
  105. }
  106. return fmt.Sprintf("%s/api/v1/namespaces/%s/secrets/%s", c.url, c.ns, name)
  107. }
  108. func getError(resp *http.Response) error {
  109. if resp.StatusCode == 200 || resp.StatusCode == 201 {
  110. // These are the only success codes returned by the Kubernetes API.
  111. // https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#http-status-codes
  112. return nil
  113. }
  114. st := &Status{}
  115. if err := json.NewDecoder(resp.Body).Decode(st); err != nil {
  116. return err
  117. }
  118. return st
  119. }
  120. func setHeader(key, value string) func(*http.Request) {
  121. return func(req *http.Request) {
  122. req.Header.Set(key, value)
  123. }
  124. }
  125. // doRequest performs an HTTP request to the Kubernetes API.
  126. // If in is not nil, it is expected to be a JSON-encodable object and will be
  127. // sent as the request body.
  128. // If out is not nil, it is expected to be a pointer to an object that can be
  129. // decoded from JSON.
  130. // If the request fails with a 401, the token is expired and a new one is
  131. // requested.
  132. func (c *Client) doRequest(ctx context.Context, method, url string, in, out any, opts ...func(*http.Request)) error {
  133. req, err := c.newRequest(ctx, method, url, in)
  134. if err != nil {
  135. return err
  136. }
  137. for _, opt := range opts {
  138. opt(req)
  139. }
  140. resp, err := c.client.Do(req)
  141. if err != nil {
  142. return err
  143. }
  144. defer resp.Body.Close()
  145. if err := getError(resp); err != nil {
  146. if st, ok := err.(*Status); ok && st.Code == 401 {
  147. c.expireToken()
  148. }
  149. return err
  150. }
  151. if out != nil {
  152. return json.NewDecoder(resp.Body).Decode(out)
  153. }
  154. return nil
  155. }
  156. func (c *Client) newRequest(ctx context.Context, method, url string, in any) (*http.Request, error) {
  157. tk, err := c.getOrRenewToken()
  158. if err != nil {
  159. return nil, err
  160. }
  161. var body io.Reader
  162. if in != nil {
  163. switch in := in.(type) {
  164. case []byte:
  165. body = bytes.NewReader(in)
  166. default:
  167. var b bytes.Buffer
  168. if err := json.NewEncoder(&b).Encode(in); err != nil {
  169. return nil, err
  170. }
  171. body = &b
  172. }
  173. }
  174. req, err := http.NewRequestWithContext(ctx, method, url, body)
  175. if err != nil {
  176. return nil, err
  177. }
  178. if body != nil {
  179. req.Header.Add("Content-Type", "application/json")
  180. }
  181. req.Header.Add("Accept", "application/json")
  182. req.Header.Add("Authorization", "Bearer "+tk)
  183. return req, nil
  184. }
  185. // GetSecret fetches the secret from the Kubernetes API.
  186. func (c *Client) GetSecret(ctx context.Context, name string) (*Secret, error) {
  187. s := &Secret{Data: make(map[string][]byte)}
  188. if err := c.doRequest(ctx, "GET", c.secretURL(name), nil, s); err != nil {
  189. return nil, err
  190. }
  191. return s, nil
  192. }
  193. // CreateSecret creates a secret in the Kubernetes API.
  194. func (c *Client) CreateSecret(ctx context.Context, s *Secret) error {
  195. s.Namespace = c.ns
  196. return c.doRequest(ctx, "POST", c.secretURL(""), s, nil)
  197. }
  198. // UpdateSecret updates a secret in the Kubernetes API.
  199. func (c *Client) UpdateSecret(ctx context.Context, s *Secret) error {
  200. return c.doRequest(ctx, "PUT", c.secretURL(s.Name), s, nil)
  201. }
  202. // JSONPatch is a JSON patch operation.
  203. // It currently (2023-03-02) only supports the "remove" operation.
  204. //
  205. // https://tools.ietf.org/html/rfc6902
  206. type JSONPatch struct {
  207. Op string `json:"op"`
  208. Path string `json:"path"`
  209. }
  210. // JSONPatchSecret updates a secret in the Kubernetes API using a JSON patch.
  211. // It currently (2023-03-02) only supports the "remove" operation.
  212. func (c *Client) JSONPatchSecret(ctx context.Context, name string, patch []JSONPatch) error {
  213. for _, p := range patch {
  214. if p.Op != "remove" {
  215. panic(fmt.Errorf("unsupported JSON patch operation: %q", p.Op))
  216. }
  217. }
  218. return c.doRequest(ctx, "PATCH", c.secretURL(name), patch, nil, setHeader("Content-Type", "application/json-patch+json"))
  219. }
  220. // StrategicMergePatchSecret updates a secret in the Kubernetes API using a
  221. // strategic merge patch.
  222. // If a fieldManager is provided, it will be used to track the patch.
  223. func (c *Client) StrategicMergePatchSecret(ctx context.Context, name string, s *Secret, fieldManager string) error {
  224. surl := c.secretURL(name)
  225. if fieldManager != "" {
  226. uv := url.Values{
  227. "fieldManager": {fieldManager},
  228. }
  229. surl += "?" + uv.Encode()
  230. }
  231. s.Namespace = c.ns
  232. s.Name = name
  233. return c.doRequest(ctx, "PATCH", surl, s, nil, setHeader("Content-Type", "application/strategic-merge-patch+json"))
  234. }
  235. // CheckSecretPermissions checks the secret access permissions of the current
  236. // pod. It returns an error if the basic permissions tailscale needs are
  237. // missing, and reports whether the patch permission is additionally present.
  238. //
  239. // Errors encountered during the access checking process are logged, but ignored
  240. // so that the pod tries to fail alive if the permissions exist and there's just
  241. // something wrong with SelfSubjectAccessReviews. There shouldn't be, pods
  242. // should always be able to use SSARs to assess their own permissions, but since
  243. // we didn't use to check permissions this way we'll be cautious in case some
  244. // old version of k8s deviates from the current behavior.
  245. func (c *Client) CheckSecretPermissions(ctx context.Context, secretName string) (canPatch bool, err error) {
  246. var errs []error
  247. for _, verb := range []string{"get", "update"} {
  248. ok, err := c.checkPermission(ctx, verb, secretName)
  249. if err != nil {
  250. log.Printf("error checking %s permission on secret %s: %v", verb, secretName, err)
  251. } else if !ok {
  252. errs = append(errs, fmt.Errorf("missing %s permission on secret %q", verb, secretName))
  253. }
  254. }
  255. if len(errs) > 0 {
  256. return false, multierr.New(errs...)
  257. }
  258. ok, err := c.checkPermission(ctx, "patch", secretName)
  259. if err != nil {
  260. log.Printf("error checking patch permission on secret %s: %v", secretName, err)
  261. return false, nil
  262. }
  263. return ok, nil
  264. }
  265. // checkPermission reports whether the current pod has permission to use the
  266. // given verb (e.g. get, update, patch) on secretName.
  267. func (c *Client) checkPermission(ctx context.Context, verb, secretName string) (bool, error) {
  268. sar := map[string]any{
  269. "apiVersion": "authorization.k8s.io/v1",
  270. "kind": "SelfSubjectAccessReview",
  271. "spec": map[string]any{
  272. "resourceAttributes": map[string]any{
  273. "namespace": c.ns,
  274. "verb": verb,
  275. "resource": "secrets",
  276. "name": secretName,
  277. },
  278. },
  279. }
  280. var res struct {
  281. Status struct {
  282. Allowed bool `json:"allowed"`
  283. } `json:"status"`
  284. }
  285. url := c.url + "/apis/authorization.k8s.io/v1/selfsubjectaccessreviews"
  286. if err := c.doRequest(ctx, "POST", url, sar, &res); err != nil {
  287. return false, err
  288. }
  289. return res.Status.Allowed, nil
  290. }