client.go 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. // +build kube
  2. /*
  3. Copyright 2020 Docker Compose CLI authors
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package client
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "net/http"
  20. "os"
  21. "strings"
  22. "time"
  23. "github.com/docker/compose-cli/pkg/api"
  24. "github.com/docker/compose-cli/pkg/utils"
  25. "golang.org/x/sync/errgroup"
  26. corev1 "k8s.io/api/core/v1"
  27. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  28. "k8s.io/apimachinery/pkg/runtime"
  29. "k8s.io/cli-runtime/pkg/genericclioptions"
  30. "k8s.io/client-go/kubernetes"
  31. "k8s.io/client-go/rest"
  32. "k8s.io/client-go/tools/portforward"
  33. "k8s.io/client-go/tools/remotecommand"
  34. "k8s.io/client-go/transport/spdy"
  35. )
  36. // KubeClient API to access kube objects
  37. type KubeClient struct {
  38. client *kubernetes.Clientset
  39. namespace string
  40. config *rest.Config
  41. ioStreams genericclioptions.IOStreams
  42. }
  43. // NewKubeClient new kubernetes client
  44. func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, error) {
  45. restConfig, err := config.ToRESTConfig()
  46. if err != nil {
  47. return nil, err
  48. }
  49. clientset, err := kubernetes.NewForConfig(restConfig)
  50. if err != nil {
  51. return nil, fmt.Errorf("failed creating clientset. Error: %+v", err)
  52. }
  53. namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
  54. if err != nil {
  55. return nil, err
  56. }
  57. return &KubeClient{
  58. client: clientset,
  59. namespace: namespace,
  60. config: restConfig,
  61. ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
  62. }, nil
  63. }
  64. // GetPod retrieves a service pod
  65. func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) {
  66. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  67. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, projectName),
  68. })
  69. if err != nil {
  70. return nil, err
  71. }
  72. if pods == nil {
  73. return nil, nil
  74. }
  75. var pod corev1.Pod
  76. for _, p := range pods.Items {
  77. service := p.Labels[api.ServiceLabel]
  78. if service == serviceName {
  79. pod = p
  80. break
  81. }
  82. }
  83. return &pod, nil
  84. }
  85. // Exec executes a command in a container
  86. func (kc KubeClient) Exec(ctx context.Context, projectName string, opts api.RunOptions) error {
  87. pod, err := kc.GetPod(ctx, projectName, opts.Service)
  88. if err != nil || pod == nil {
  89. return err
  90. }
  91. if len(pod.Spec.Containers) == 0 {
  92. return fmt.Errorf("no containers running in pod %s", pod.Name)
  93. }
  94. // get first container in the pod
  95. container := &pod.Spec.Containers[0]
  96. containerName := container.Name
  97. req := kc.client.CoreV1().RESTClient().Post().
  98. Resource("pods").
  99. Name(pod.Name).
  100. Namespace(kc.namespace).
  101. SubResource("exec")
  102. option := &corev1.PodExecOptions{
  103. Container: containerName,
  104. Command: opts.Command,
  105. Stdin: true,
  106. Stdout: true,
  107. Stderr: true,
  108. TTY: opts.Tty,
  109. }
  110. if opts.Stdin == nil {
  111. option.Stdin = false
  112. }
  113. scheme := runtime.NewScheme()
  114. if err := corev1.AddToScheme(scheme); err != nil {
  115. return fmt.Errorf("error adding to scheme: %v", err)
  116. }
  117. parameterCodec := runtime.NewParameterCodec(scheme)
  118. req.VersionedParams(option, parameterCodec)
  119. exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL())
  120. if err != nil {
  121. return err
  122. }
  123. return exec.Stream(remotecommand.StreamOptions{
  124. Stdin: opts.Stdin,
  125. Stdout: opts.Stdout,
  126. Stderr: opts.Stdout,
  127. Tty: opts.Tty,
  128. })
  129. }
  130. // GetContainers get containers for a given compose project
  131. func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]api.ContainerSummary, error) {
  132. fieldSelector := ""
  133. if !all {
  134. fieldSelector = "status.phase=Running"
  135. }
  136. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  137. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, projectName),
  138. FieldSelector: fieldSelector,
  139. })
  140. if err != nil {
  141. return nil, err
  142. }
  143. services := map[string][]api.PortPublisher{}
  144. result := []api.ContainerSummary{}
  145. for _, pod := range pods.Items {
  146. summary := podToContainerSummary(pod)
  147. serviceName := pod.GetObjectMeta().GetLabels()[api.ServiceLabel]
  148. ports, ok := services[serviceName]
  149. if !ok {
  150. s, err := kc.client.CoreV1().Services(kc.namespace).Get(ctx, serviceName, metav1.GetOptions{})
  151. if err != nil {
  152. if !strings.Contains(err.Error(), "not found") {
  153. return nil, err
  154. }
  155. result = append(result, summary)
  156. continue
  157. }
  158. ports = []api.PortPublisher{}
  159. if s != nil {
  160. if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
  161. if len(s.Status.LoadBalancer.Ingress) > 0 {
  162. port := api.PortPublisher{URL: s.Status.LoadBalancer.Ingress[0].IP}
  163. if len(s.Spec.Ports) > 0 {
  164. port.URL = fmt.Sprintf("%s:%d", port.URL, s.Spec.Ports[0].Port)
  165. port.TargetPort = s.Spec.Ports[0].TargetPort.IntValue()
  166. port.Protocol = string(s.Spec.Ports[0].Protocol)
  167. }
  168. ports = append(ports, port)
  169. }
  170. }
  171. }
  172. services[serviceName] = ports
  173. }
  174. summary.Publishers = ports
  175. result = append(result, summary)
  176. }
  177. return result, nil
  178. }
  179. // GetLogs retrieves pod logs
  180. func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer api.LogConsumer, follow bool) error {
  181. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  182. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, projectName),
  183. })
  184. if err != nil {
  185. return err
  186. }
  187. eg, ctx := errgroup.WithContext(ctx)
  188. for _, pod := range pods.Items {
  189. podName := pod.Name
  190. request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(podName, &corev1.PodLogOptions{Follow: follow})
  191. service := pod.Labels[api.ServiceLabel]
  192. w := utils.GetWriter(func(line string) {
  193. consumer.Log(podName, service, line)
  194. })
  195. eg.Go(func() error {
  196. r, err := request.Stream(ctx)
  197. if err != nil {
  198. return err
  199. }
  200. defer r.Close() // nolint errcheck
  201. _, err = io.Copy(w, r)
  202. return err
  203. })
  204. }
  205. return eg.Wait()
  206. }
  207. // WaitForPodState blocks until pods reach desired state
  208. func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error {
  209. var timeout = time.Minute
  210. if opts.Timeout != nil {
  211. timeout = *opts.Timeout
  212. }
  213. errch := make(chan error, 1)
  214. done := make(chan bool)
  215. go func() {
  216. for {
  217. time.Sleep(500 * time.Millisecond)
  218. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  219. LabelSelector: fmt.Sprintf("%s=%s", api.ProjectLabel, opts.ProjectName),
  220. })
  221. if err != nil {
  222. errch <- err
  223. }
  224. stateReached, servicePods, err := checkPodsState(opts.Services, pods.Items, opts.Status)
  225. if err != nil {
  226. errch <- err
  227. }
  228. if opts.Log != nil {
  229. for p, m := range servicePods {
  230. opts.Log(p, stateReached, m)
  231. }
  232. }
  233. if stateReached {
  234. done <- true
  235. }
  236. }
  237. }()
  238. select {
  239. case <-time.After(timeout):
  240. return fmt.Errorf("timeout: pods did not reach expected state")
  241. case err := <-errch:
  242. if err != nil {
  243. return err
  244. }
  245. case <-done:
  246. return nil
  247. }
  248. return nil
  249. }
  250. //MapPortsToLocalhost runs a port-forwarder daemon process
  251. func (kc KubeClient) MapPortsToLocalhost(ctx context.Context, opts PortMappingOptions) error {
  252. stopChannel := make(chan struct{}, 1)
  253. readyChannel := make(chan struct{})
  254. eg, ctx := errgroup.WithContext(ctx)
  255. for serviceName, servicePorts := range opts.Services {
  256. serviceName, servicePorts := serviceName, servicePorts
  257. pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
  258. if err != nil {
  259. return err
  260. }
  261. eg.Go(func() error {
  262. ports := []string{}
  263. for _, p := range servicePorts {
  264. ports = append(ports, fmt.Sprintf("%d:%d", p.PublishedPort, p.TargetPort))
  265. }
  266. req := kc.client.CoreV1().RESTClient().Post().
  267. Resource("pods").
  268. Name(pod.Name).
  269. Namespace(kc.namespace).
  270. SubResource("portforward")
  271. transport, upgrader, err := spdy.RoundTripperFor(kc.config)
  272. if err != nil {
  273. return err
  274. }
  275. dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
  276. fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, os.Stdout, os.Stderr)
  277. if err != nil {
  278. return err
  279. }
  280. return fw.ForwardPorts()
  281. })
  282. }
  283. return eg.Wait()
  284. }