client.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. // +build kube
  2. /*
  3. Copyright 2020 Docker Compose CLI authors
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package client
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "time"
  20. "github.com/docker/compose-cli/api/compose"
  21. "github.com/docker/compose-cli/utils"
  22. "golang.org/x/sync/errgroup"
  23. corev1 "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/cli-runtime/pkg/genericclioptions"
  26. "k8s.io/client-go/kubernetes"
  27. )
  28. // KubeClient API to access kube objects
  29. type KubeClient struct {
  30. client *kubernetes.Clientset
  31. namespace string
  32. }
  33. // NewKubeClient new kubernetes client
  34. func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, error) {
  35. restConfig, err := config.ToRESTConfig()
  36. if err != nil {
  37. return nil, err
  38. }
  39. clientset, err := kubernetes.NewForConfig(restConfig)
  40. if err != nil {
  41. return nil, err
  42. }
  43. namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
  44. if err != nil {
  45. return nil, err
  46. }
  47. return &KubeClient{
  48. client: clientset,
  49. namespace: namespace,
  50. }, nil
  51. }
  52. // GetContainers get containers for a given compose project
  53. func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) {
  54. fieldSelector := ""
  55. if !all {
  56. fieldSelector = "status.phase=Running"
  57. }
  58. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  59. LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
  60. FieldSelector: fieldSelector,
  61. })
  62. if err != nil {
  63. return nil, err
  64. }
  65. result := []compose.ContainerSummary{}
  66. for _, pod := range pods.Items {
  67. result = append(result, podToContainerSummary(pod))
  68. }
  69. return result, nil
  70. }
  71. // GetLogs retrieves pod logs
  72. func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer compose.LogConsumer, follow bool) error {
  73. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  74. LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
  75. })
  76. if err != nil {
  77. return err
  78. }
  79. eg, ctx := errgroup.WithContext(ctx)
  80. for _, pod := range pods.Items {
  81. request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
  82. service := pod.Labels[compose.ServiceTag]
  83. w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
  84. consumer.Log(event.Name, event.Service, event.Source, event.Line)
  85. })
  86. eg.Go(func() error {
  87. r, err := request.Stream(ctx)
  88. if err != nil {
  89. return err
  90. }
  91. defer r.Close() // nolint errcheck
  92. _, err = io.Copy(w, r)
  93. return err
  94. })
  95. }
  96. return eg.Wait()
  97. }
  98. // WaitForRunningPodState blocks until pods are in running state
  99. func (kc KubeClient) WaitForPodState(ctx context.Context, projectName string, services []string, status string, timeout int) error {
  100. if timeout > 0 {
  101. var t time.Duration
  102. t = time.Duration(timeout) * time.Second
  103. fmt.Println("Timeout ", t)
  104. }
  105. selector := fmt.Sprintf("%s=%s", compose.ProjectTag, projectName)
  106. waitingForPhase := corev1.PodRunning
  107. switch status {
  108. case compose.STARTING:
  109. waitingForPhase = corev1.PodPending
  110. case compose.UNKNOWN:
  111. waitingForPhase = corev1.PodUnknown
  112. }
  113. //fieldSelector := "status.phase=Running"
  114. for {
  115. time.Sleep(time.Duration(1) * time.Second)
  116. timeout = timeout - 1
  117. if timeout <= 0 {
  118. return fmt.Errorf("Deployment time out. Pods did not reach expected state.")
  119. }
  120. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  121. LabelSelector: selector,
  122. })
  123. if err != nil {
  124. return err
  125. }
  126. servicePods := 0
  127. stateReached := true
  128. for _, pod := range pods.Items {
  129. if status == compose.REMOVING {
  130. if contains(services, pod.Labels[compose.ServiceTag]) {
  131. servicePods = servicePods + 1
  132. }
  133. continue
  134. }
  135. if pod.Status.Phase != waitingForPhase {
  136. stateReached = false
  137. }
  138. }
  139. if status == compose.REMOVING {
  140. if len(pods.Items) > 0 {
  141. continue
  142. }
  143. return nil
  144. }
  145. if !stateReached {
  146. continue
  147. }
  148. return nil
  149. }
  150. }