client.go 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // +build kube
  2. /*
  3. Copyright 2020 Docker Compose CLI authors
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package client
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "time"
  20. "github.com/docker/compose-cli/api/compose"
  21. "github.com/docker/compose-cli/utils"
  22. "golang.org/x/sync/errgroup"
  23. corev1 "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/cli-runtime/pkg/genericclioptions"
  26. "k8s.io/client-go/kubernetes"
  27. )
  28. // KubeClient API to access kube objects
  29. type KubeClient struct {
  30. client *kubernetes.Clientset
  31. namespace string
  32. }
  33. // NewKubeClient new kubernetes client
  34. func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, error) {
  35. restConfig, err := config.ToRESTConfig()
  36. if err != nil {
  37. return nil, err
  38. }
  39. clientset, err := kubernetes.NewForConfig(restConfig)
  40. if err != nil {
  41. return nil, err
  42. }
  43. namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
  44. if err != nil {
  45. return nil, err
  46. }
  47. return &KubeClient{
  48. client: clientset,
  49. namespace: namespace,
  50. }, nil
  51. }
  52. // GetContainers get containers for a given compose project
  53. func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) {
  54. fieldSelector := ""
  55. if !all {
  56. fieldSelector = "status.phase=Running"
  57. }
  58. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  59. LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
  60. FieldSelector: fieldSelector,
  61. })
  62. if err != nil {
  63. return nil, err
  64. }
  65. result := []compose.ContainerSummary{}
  66. for _, pod := range pods.Items {
  67. result = append(result, podToContainerSummary(pod))
  68. }
  69. return result, nil
  70. }
  71. // GetLogs retrieves pod logs
  72. func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer compose.LogConsumer, follow bool) error {
  73. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  74. LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
  75. })
  76. if err != nil {
  77. return err
  78. }
  79. eg, ctx := errgroup.WithContext(ctx)
  80. for _, pod := range pods.Items {
  81. request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
  82. service := pod.Labels[compose.ServiceTag]
  83. w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
  84. consumer.Log(event.Name, event.Service, event.Source, event.Line)
  85. })
  86. eg.Go(func() error {
  87. r, err := request.Stream(ctx)
  88. if err != nil {
  89. return err
  90. }
  91. defer r.Close() // nolint errcheck
  92. _, err = io.Copy(w, r)
  93. return err
  94. })
  95. }
  96. return eg.Wait()
  97. }
  98. // WaitForRunningPodState blocks until pods are in running state
  99. func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error {
  100. var timeout time.Duration = time.Duration(60) * time.Second
  101. if opts.Timeout != nil {
  102. timeout = *opts.Timeout
  103. }
  104. selector := fmt.Sprintf("%s=%s", compose.ProjectTag, opts.ProjectName)
  105. waitingForPhase := corev1.PodRunning
  106. switch opts.Status {
  107. case compose.STARTING:
  108. waitingForPhase = corev1.PodPending
  109. case compose.UNKNOWN:
  110. waitingForPhase = corev1.PodUnknown
  111. }
  112. errch := make(chan error, 1)
  113. done := make(chan bool)
  114. status := opts.Status
  115. go func() {
  116. for {
  117. time.Sleep(500 * time.Millisecond)
  118. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  119. LabelSelector: selector,
  120. })
  121. if err != nil {
  122. errch <- err
  123. }
  124. servicePods := map[string]string{}
  125. stateReached := true
  126. for _, pod := range pods.Items {
  127. service := pod.Labels[compose.ServiceTag]
  128. if opts.Services == nil || utils.StringContains(opts.Services, service) {
  129. servicePods[service] = pod.Status.Message
  130. }
  131. if status == compose.REMOVING {
  132. continue
  133. }
  134. if pod.Status.Phase != waitingForPhase {
  135. stateReached = false
  136. }
  137. }
  138. if status == compose.REMOVING {
  139. if len(servicePods) > 0 {
  140. stateReached = false
  141. }
  142. }
  143. if opts.Log != nil {
  144. for p, m := range servicePods {
  145. opts.Log(p, stateReached, m)
  146. }
  147. }
  148. if stateReached {
  149. done <- true
  150. }
  151. }
  152. }()
  153. select {
  154. case <-time.After(timeout):
  155. return fmt.Errorf("timeout: pods did not reach expected state")
  156. case err := <-errch:
  157. if err != nil {
  158. return err
  159. }
  160. case <-done:
  161. return nil
  162. }
  163. return nil
  164. }