client.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. // +build kube
  2. /*
  3. Copyright 2020 Docker Compose CLI authors
  4. Licensed under the Apache License, Version 2.0 (the "License");
  5. you may not use this file except in compliance with the License.
  6. You may obtain a copy of the License at
  7. http://www.apache.org/licenses/LICENSE-2.0
  8. Unless required by applicable law or agreed to in writing, software
  9. distributed under the License is distributed on an "AS IS" BASIS,
  10. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  11. See the License for the specific language governing permissions and
  12. limitations under the License.
  13. */
  14. package client
  15. import (
  16. "context"
  17. "fmt"
  18. "io"
  19. "time"
  20. "github.com/docker/compose-cli/api/compose"
  21. "github.com/docker/compose-cli/utils"
  22. "golang.org/x/sync/errgroup"
  23. corev1 "k8s.io/api/core/v1"
  24. metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
  25. "k8s.io/cli-runtime/pkg/genericclioptions"
  26. "k8s.io/client-go/kubernetes"
  27. )
  28. // KubeClient API to access kube objects
  29. type KubeClient struct {
  30. client *kubernetes.Clientset
  31. namespace string
  32. }
  33. // NewKubeClient new kubernetes client
  34. func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, error) {
  35. restConfig, err := config.ToRESTConfig()
  36. if err != nil {
  37. return nil, err
  38. }
  39. clientset, err := kubernetes.NewForConfig(restConfig)
  40. if err != nil {
  41. return nil, err
  42. }
  43. namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
  44. if err != nil {
  45. return nil, err
  46. }
  47. return &KubeClient{
  48. client: clientset,
  49. namespace: namespace,
  50. }, nil
  51. }
  52. // GetContainers get containers for a given compose project
  53. func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) {
  54. fieldSelector := ""
  55. if !all {
  56. fieldSelector = "status.phase=Running"
  57. }
  58. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  59. LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
  60. FieldSelector: fieldSelector,
  61. })
  62. if err != nil {
  63. return nil, err
  64. }
  65. result := []compose.ContainerSummary{}
  66. for _, pod := range pods.Items {
  67. result = append(result, podToContainerSummary(pod))
  68. }
  69. return result, nil
  70. }
  71. // GetLogs retrieves pod logs
  72. func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer compose.LogConsumer, follow bool) error {
  73. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  74. LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
  75. })
  76. if err != nil {
  77. return err
  78. }
  79. eg, ctx := errgroup.WithContext(ctx)
  80. for _, pod := range pods.Items {
  81. request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
  82. service := pod.Labels[compose.ServiceTag]
  83. w := utils.GetWriter(pod.Name, service, func(event compose.ContainerEvent) {
  84. consumer.Log(event.Container, event.Service, event.Line)
  85. })
  86. eg.Go(func() error {
  87. r, err := request.Stream(ctx)
  88. if err != nil {
  89. return err
  90. }
  91. defer r.Close() // nolint errcheck
  92. _, err = io.Copy(w, r)
  93. return err
  94. })
  95. }
  96. return eg.Wait()
  97. }
  98. // WaitForPodState blocks until pods reach desired state
  99. func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOptions) error {
  100. var timeout time.Duration = time.Minute
  101. if opts.Timeout != nil {
  102. timeout = *opts.Timeout
  103. }
  104. errch := make(chan error, 1)
  105. done := make(chan bool)
  106. go func() {
  107. for {
  108. time.Sleep(500 * time.Millisecond)
  109. pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
  110. LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, opts.ProjectName),
  111. })
  112. if err != nil {
  113. errch <- err
  114. }
  115. stateReached, servicePods, err := checkPodsState(opts.Services, pods.Items, opts.Status)
  116. if err != nil {
  117. errch <- err
  118. }
  119. if opts.Log != nil {
  120. for p, m := range servicePods {
  121. opts.Log(p, stateReached, m)
  122. }
  123. }
  124. if stateReached {
  125. done <- true
  126. }
  127. }
  128. }()
  129. select {
  130. case <-time.After(timeout):
  131. return fmt.Errorf("timeout: pods did not reach expected state")
  132. case err := <-errch:
  133. if err != nil {
  134. return err
  135. }
  136. case <-done:
  137. return nil
  138. }
  139. return nil
  140. }