client.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. // +build kube
  2. /*
  3. Copyright 2020 Docker Compose CLI authors
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package client
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "os"
  21. "strings"
  22. "time"
  23. "github.com/docker/compose-cli/pkg/api"
  24. "github.com/docker/compose-cli/pkg/utils"
  25. "golang.org/x/sync/errgroup"
  26. corev1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/cli-runtime/pkg/genericclioptions"
  30. "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/rest"
  32. "k8s.io/client-go/tools/portforward"
  33. "k8s.io/client-go/tools/remotecommand"
  34. "k8s.io/client-go/transport/spdy"
  35. )
  36. // KubeClient API to access kube objects
  37. type KubeClient struct {
  38. client *kubernetes.Clientset
  39. namespace string
  40. config *rest.Config
  41. ioStreams genericclioptions.IOStreams
  42. }
  43. // NewKubeClient new kubernetes client
  44. func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, error) {
  45. restConfig, err := config.ToRESTConfig()
  46. if err != nil {
  47. return nil, err
  48. }
  49. clientset, err := kubernetes.NewForConfig(restConfig)
  50. if err != nil {
  51. return nil, fmt.Errorf("failed creating clientset. Error: %+v", err)
  52. }
  53. namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
  54. if err != nil {
  55. return nil, err
  56. }
  57. return &KubeClient{
  58. client: clientset,
  59. namespace: namespace,
  60. config: restConfig,
  61. ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
  62. }, nil
  63. }
  64. // GetPod retrieves a service pod
  65. func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) {
  66. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  67. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, projectName),
  68. })
  69. if err != nil {
  70. return nil, err
  71. }
  72. if pods == nil {
  73. return nil, nil
  74. }
  75. var pod corev1.Pod
  76. for _, p := range pods.Items {
  77. service := p.Labels[api.ServiceLabel]
  78. if service == serviceName {
  79. pod = p
  80. break
  81. }
  82. }
  83. return &pod, nil
  84. }
  85. // Exec executes a command in a container
  86. func (kc KubeClient) Exec(ctx context.Context, projectName string, opts api.RunOptions) error {
  87. pod, err := kc.GetPod(ctx, projectName, opts.Service)
  88. if err != nil || pod == nil {
  89. return err
  90. }
  91. if len(pod.Spec.Containers) == 0 {
  92. return fmt.Errorf("no containers running in pod %s", pod.Name)
  93. }
  94. // get first container in the pod
  95. container := &pod.Spec.Containers[0]
  96. containerName := container.Name
  97. req := kc.client.CoreV1().RESTClient().Post().
  98. Resource("pods").
  99. Name(pod.Name).
  100. Namespace(kc.namespace).
  101. SubResource("exec")
  102. option := &corev1.PodExecOptions{
  103. Container: containerName,
  104. Command: opts.Command,
  105. Stdin: true,
  106. Stdout: true,
  107. Stderr: true,
  108. TTY: opts.Tty,
  109. }
  110. if opts.Stdin == nil {
  111. option.Stdin = false
  112. }
  113. scheme := runtime.NewScheme()
  114. if err := corev1.AddToScheme(scheme); err != nil {
  115. return fmt.Errorf("error adding to scheme: %v", err)
  116. }
  117. parameterCodec := runtime.NewParameterCodec(scheme)
  118. req.VersionedParams(option, parameterCodec)
  119. exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL())
  120. if err != nil {
  121. return err
  122. }
  123. return exec.Stream(remotecommand.StreamOptions{
  124. Stdin: opts.Stdin,
  125. Stdout: opts.Stdout,
  126. Stderr: opts.Stdout,
  127. Tty: opts.Tty,
  128. })
  129. }
  130. // GetContainers get containers for a given compose project
  131. func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]api.ContainerSummary, error) {
  132. fieldSelector := ""
  133. if !all {
  134. fieldSelector = "status.phase=Running"
  135. }
  136. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  137. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, projectName),
  138. FieldSelector: fieldSelector,
  139. })
  140. if err != nil {
  141. return nil, err
  142. }
  143. services := map[string][]api.PortPublisher{}
  144. result := []api.ContainerSummary{}
  145. for _, pod := range pods.Items {
  146. summary := podToContainerSummary(pod)
  147. serviceName := pod.GetObjectMeta().GetLabels()[api.ServiceLabel]
  148. ports, ok := services[serviceName]
  149. if !ok {
  150. s, err := kc.client.CoreV1().Services(kc.namespace).Get(ctx, serviceName, metav1.GetOptions{})
  151. if err != nil {
  152. if !strings.Contains(err.Error(), "not found") {
  153. return nil, err
  154. }
  155. result = append(result, summary)
  156. continue
  157. }
  158. ports = []api.PortPublisher{}
  159. if s != nil {
  160. if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
  161. if len(s.Status.LoadBalancer.Ingress) > 0 {
  162. port := api.PortPublisher{URL: s.Status.LoadBalancer.Ingress[0].IP}
  163. if len(s.Spec.Ports) > 0 {
  164. port.URL = fmt.Sprintf("%s:%d", port.URL, s.Spec.Ports[0].Port)
  165. port.TargetPort = s.Spec.Ports[0].TargetPort.IntValue()
  166. port.Protocol = string(s.Spec.Ports[0].Protocol)
  167. }
  168. ports = append(ports, port)
  169. }
  170. }
  171. }
  172. services[serviceName] = ports
  173. }
  174. summary.Publishers = ports
  175. result = append(result, summary)
  176. }
  177. return result, nil
  178. }
  179. // GetLogs retrieves pod logs
  180. func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer api.LogConsumer, follow bool) error {
  181. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  182. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, projectName),
  183. })
  184. if err != nil {
  185. return err
  186. }
  187. eg, ctx := errgroup.WithContext(ctx)
  188. for _, pod := range pods.Items {
  189. request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
  190. service := pod.Labels[api.ServiceLabel]
  191. w := utils.GetWriter(func(line string) {
  192. consumer.Log(pod.Name, service, line)
  193. })
  194. eg.Go(func() error {
  195. r, err := request.Stream(ctx)
  196. if err != nil {
  197. return err
  198. }
  199. defer r.Close() // nolint errcheck
  200. _, err = io.Copy(w, r)
  201. return err
  202. })
  203. }
  204. return eg.Wait()
  205. }
  206. // WaitForPodState blocks until pods reach desired state
  207. func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error {
  208. var timeout = time.Minute
  209. if opts.Timeout != nil {
  210. timeout = *opts.Timeout
  211. }
  212. errch := make(chan error, 1)
  213. done := make(chan bool)
  214. go func() {
  215. for {
  216. time.Sleep(500 * time.Millisecond)
  217. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  218. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, opts.ProjectName),
  219. })
  220. if err != nil {
  221. errch <- err
  222. }
  223. stateReached, servicePods, err := checkPodsState(opts.Services, pods.Items, opts.Status)
  224. if err != nil {
  225. errch <- err
  226. }
  227. if opts.Log != nil {
  228. for p, m := range servicePods {
  229. opts.Log(p, stateReached, m)
  230. }
  231. }
  232. if stateReached {
  233. done <- true
  234. }
  235. }
  236. }()
  237. select {
  238. case <-time.After(timeout):
  239. return fmt.Errorf("timeout: pods did not reach expected state")
  240. case err := <-errch:
  241. if err != nil {
  242. return err
  243. }
  244. case <-done:
  245. return nil
  246. }
  247. return nil
  248. }
  249. //MapPortsToLocalhost runs a port-forwarder daemon process
  250. func (kc KubeClient) MapPortsToLocalhost(ctx context.Context, opts PortMappingOptions) error {
  251. stopChannel := make(chan struct{}, 1)
  252. readyChannel := make(chan struct{})
  253. eg, ctx := errgroup.WithContext(ctx)
  254. for serviceName, servicePorts := range opts.Services {
  255. serviceName, servicePorts := serviceName, servicePorts
  256. pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
  257. if err != nil {
  258. return err
  259. }
  260. eg.Go(func() error {
  261. ports := []string{}
  262. for _, p := range servicePorts {
  263. ports = append(ports, fmt.Sprintf("%d:%d", p.PublishedPort, p.TargetPort))
  264. }
  265. req := kc.client.CoreV1().RESTClient().Post().
  266. Resource("pods").
  267. Name(pod.Name).
  268. Namespace(kc.namespace).
  269. SubResource("portforward")
  270. transport, upgrader, err := spdy.RoundTripperFor(kc.config)
  271. if err != nil {
  272. return err
  273. }
  274. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
  275. fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, os.Stdout, os.Stderr)
  276. if err != nil {
  277. return err
  278. }
  279. return fw.ForwardPorts()
  280. })
  281. }
  282. return eg.Wait()
  283. }