Browse Source

Merge pull request #1758 from aiordache/kube_hack_april

Kube backend updates
Nicolas De loof 4 years ago
parent
commit
25c3df6c86
5 changed files with 229 additions and 15 deletions
  1. 156 2
      kube/client/client.go
  2. 34 2
      kube/client/utils.go
  3. 24 9
      kube/compose.go
  4. 13 0
      kube/helm/helm.go
  5. 2 2
      kube/resources/kube.go

+ 156 - 2
kube/client/client.go

@@ -22,6 +22,9 @@ import (
 	"context"
 	"fmt"
 	"io"
+	"net/http"
+	"os"
+	"strings"
 	"time"
 
 	"github.com/docker/compose-cli/api/compose"
@@ -29,14 +32,21 @@ import (
 	"golang.org/x/sync/errgroup"
 	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/runtime"
 	"k8s.io/cli-runtime/pkg/genericclioptions"
 	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/rest"
+	"k8s.io/client-go/tools/portforward"
+	"k8s.io/client-go/tools/remotecommand"
+	"k8s.io/client-go/transport/spdy"
 )
 
 // KubeClient API to access kube objects
 type KubeClient struct {
 	client    *kubernetes.Clientset
 	namespace string
+	config    *rest.Config
+	ioStreams genericclioptions.IOStreams
 }
 
 // NewKubeClient new kubernetes client
@@ -48,7 +58,7 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
 
 	clientset, err := kubernetes.NewForConfig(restConfig)
 	if err != nil {
-		return nil, err
+		return nil, fmt.Errorf("failed creating clientset. Error: %+v", err)
 	}
 
 	namespace, _, err := config.ToRawKubeConfigLoader().Namespace()
@@ -59,9 +69,84 @@ func NewKubeClient(config genericclioptions.RESTClientGetter) (*KubeClient, erro
 	return &KubeClient{
 		client:    clientset,
 		namespace: namespace,
+		config:    restConfig,
+		ioStreams: genericclioptions.IOStreams{In: os.Stdin, Out: os.Stdout, ErrOut: os.Stderr},
 	}, nil
 }
 
+// GetPod retrieves a service pod
+func (kc KubeClient) GetPod(ctx context.Context, projectName, serviceName string) (*corev1.Pod, error) {
+	pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
+		LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
+	})
+	if err != nil {
+		return nil, err
+	}
+	if pods == nil {
+		return nil, nil
+	}
+	var pod corev1.Pod
+	for _, p := range pods.Items {
+		service := p.Labels[compose.ServiceTag]
+		if service == serviceName {
+			pod = p
+			break
+		}
+	}
+	return &pod, nil
+}
+
+// Exec executes a command in a container
+func (kc KubeClient) Exec(ctx context.Context, projectName string, opts compose.RunOptions) error {
+	pod, err := kc.GetPod(ctx, projectName, opts.Service)
+	if err != nil || pod == nil {
+		return err
+	}
+	if len(pod.Spec.Containers) == 0 {
+		return fmt.Errorf("no containers running in pod %s", pod.Name)
+	}
+	// get first container in the pod
+	container := &pod.Spec.Containers[0]
+	containerName := container.Name
+
+	req := kc.client.CoreV1().RESTClient().Post().
+		Resource("pods").
+		Name(pod.Name).
+		Namespace(kc.namespace).
+		SubResource("exec")
+
+	option := &corev1.PodExecOptions{
+		Container: containerName,
+		Command:   opts.Command,
+		Stdin:     true,
+		Stdout:    true,
+		Stderr:    true,
+		TTY:       opts.Tty,
+	}
+
+	if opts.Reader == nil {
+		option.Stdin = false
+	}
+
+	scheme := runtime.NewScheme()
+	if err := corev1.AddToScheme(scheme); err != nil {
+		return fmt.Errorf("error adding to scheme: %v", err)
+	}
+	parameterCodec := runtime.NewParameterCodec(scheme)
+	req.VersionedParams(option, parameterCodec)
+
+	exec, err := remotecommand.NewSPDYExecutor(kc.config, "POST", req.URL())
+	if err != nil {
+		return err
+	}
+	return exec.Stream(remotecommand.StreamOptions{
+		Stdin:  opts.Reader,
+		Stdout: opts.Writer,
+		Stderr: opts.Writer,
+		Tty:    opts.Tty,
+	})
+}
+
 // GetContainers get containers for a given compose project
 func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all bool) ([]compose.ContainerSummary, error) {
 	fieldSelector := ""
@@ -76,9 +161,39 @@ func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all
 	if err != nil {
 		return nil, err
 	}
+	services := map[string][]compose.PortPublisher{}
 	result := []compose.ContainerSummary{}
 	for _, pod := range pods.Items {
-		result = append(result, podToContainerSummary(pod))
+		summary := podToContainerSummary(pod)
+		serviceName := pod.GetObjectMeta().GetLabels()[compose.ServiceTag]
+		ports, ok := services[serviceName]
+		if !ok {
+			s, err := kc.client.CoreV1().Services(kc.namespace).Get(ctx, serviceName, metav1.GetOptions{})
+			if err != nil {
+				if !strings.Contains(err.Error(), "not found") {
+					return nil, err
+				}
+				result = append(result, summary)
+				continue
+			}
+			ports = []compose.PortPublisher{}
+			if s != nil {
+				if s.Spec.Type == corev1.ServiceTypeLoadBalancer {
+					if len(s.Status.LoadBalancer.Ingress) > 0 {
+						port := compose.PortPublisher{URL: s.Status.LoadBalancer.Ingress[0].IP}
+						if len(s.Spec.Ports) > 0 {
+							port.URL = fmt.Sprintf("%s:%d", port.URL, s.Spec.Ports[0].Port)
+							port.TargetPort = s.Spec.Ports[0].TargetPort.IntValue()
+							port.Protocol = string(s.Spec.Ports[0].Protocol)
+						}
+						ports = append(ports, port)
+					}
+				}
+			}
+			services[serviceName] = ports
+		}
+		summary.Publishers = ports
+		result = append(result, summary)
 	}
 
 	return result, nil
@@ -161,3 +276,42 @@ func (kc KubeClient) WaitForPodState(ctx context.Context, opts WaitForStatusOpti
 	}
 	return nil
 }
+
+//MapPortsToLocalhost runs a port-forwarder daemon process
+func (kc KubeClient) MapPortsToLocalhost(ctx context.Context, opts PortMappingOptions) error {
+	stopChannel := make(chan struct{}, 1)
+	readyChannel := make(chan struct{})
+
+	eg, ctx := errgroup.WithContext(ctx)
+	for serviceName, servicePorts := range opts.Services {
+		serviceName := serviceName
+		servicePorts := servicePorts
+		pod, err := kc.GetPod(ctx, opts.ProjectName, serviceName)
+		if err != nil {
+			return err
+		}
+		eg.Go(func() error {
+			ports := []string{}
+			for _, p := range servicePorts {
+				ports = append(ports, fmt.Sprintf("%d:%d", p.PublishedPort, p.TargetPort))
+			}
+
+			req := kc.client.CoreV1().RESTClient().Post().
+				Resource("pods").
+				Name(pod.Name).
+				Namespace(kc.namespace).
+				SubResource("portforward")
+			transport, upgrader, err := spdy.RoundTripperFor(kc.config)
+			if err != nil {
+				return err
+			}
+			dialer := spdy.NewDialer(upgrader, &http.Client{Transport: transport}, "POST", req.URL())
+			fw, err := portforward.New(dialer, ports, stopChannel, readyChannel, os.Stdout, os.Stderr)
+			if err != nil {
+				return err
+			}
+			return fw.ForwardPorts()
+		})
+	}
+	return eg.Wait()
+}

+ 34 - 2
kube/client/utils.go

@@ -28,11 +28,27 @@ import (
 )
 
 func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary {
+	state := compose.RUNNING
+
+	if pod.DeletionTimestamp != nil {
+		state = compose.REMOVING
+	} else {
+		for _, container := range pod.Status.ContainerStatuses {
+			if container.State.Waiting != nil || container.State.Terminated != nil {
+				state = compose.UPDATING
+				break
+			}
+		}
+		if state == compose.RUNNING && pod.Status.Phase != corev1.PodRunning {
+			state = string(pod.Status.Phase)
+		}
+	}
+
 	return compose.ContainerSummary{
 		ID:      pod.GetObjectMeta().GetName(),
 		Name:    pod.GetObjectMeta().GetName(),
 		Service: pod.GetObjectMeta().GetLabels()[compose.ServiceTag],
-		State:   string(pod.Status.Phase),
+		State:   state,
 		Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag],
 	}
 }
@@ -46,6 +62,13 @@ func checkPodsState(services []string, pods []corev1.Pod, status string) (bool,
 		if len(services) > 0 && !utils.StringContains(services, service) {
 			continue
 		}
+		containersRunning := true
+		for _, container := range pod.Status.ContainerStatuses {
+			if container.State.Running == nil {
+				containersRunning = false
+				break
+			}
+		}
 		servicePods[service] = pod.Status.Message
 
 		if status == compose.REMOVING {
@@ -54,7 +77,7 @@ func checkPodsState(services []string, pods []corev1.Pod, status string) (bool,
 		if pod.Status.Phase == corev1.PodFailed {
 			return false, servicePods, fmt.Errorf(pod.Status.Reason)
 		}
-		if status == compose.RUNNING && pod.Status.Phase != corev1.PodRunning {
+		if status == compose.RUNNING && (pod.Status.Phase != corev1.PodRunning || !containersRunning) {
 			stateReached = false
 		}
 	}
@@ -75,3 +98,12 @@ type WaitForStatusOptions struct {
 	Timeout     *time.Duration
 	Log         LogFunc
 }
+
+// Ports holds published ports data
+type Ports []compose.PortPublisher
+
+// PortMappingOptions holds the port mapping for project services
+type PortMappingOptions struct {
+	ProjectName string
+	Services    map[string]Ports
+}

+ 24 - 9
kube/compose.go

@@ -74,7 +74,7 @@ func NewComposeService() (compose.Service, error) {
 func (s *composeService) Up(ctx context.Context, project *types.Project, options compose.UpOptions) error {
 	w := progress.ContextWriter(ctx)
 
-	eventName := "Convert to Helm charts"
+	eventName := "Convert Compose file to Helm charts"
 	w.Event(progress.CreatingEvent(eventName))
 
 	chart, err := helm.GetChartInMemory(project)
@@ -83,16 +83,31 @@ func (s *composeService) Up(ctx context.Context, project *types.Project, options
 	}
 	w.Event(progress.NewEvent(eventName, progress.Done, ""))
 
-	eventName = "Install Helm charts"
-	w.Event(progress.CreatingEvent(eventName))
-
-	err = s.sdk.InstallChart(project.Name, chart, func(format string, v ...interface{}) {
-		message := fmt.Sprintf(format, v...)
-		w.Event(progress.NewEvent(eventName, progress.Done, message))
-	})
+	stack, err := s.sdk.Get(project.Name)
+	if err != nil || stack == nil {
+		// install stack
+		eventName = "Install Compose stack"
+		w.Event(progress.CreatingEvent(eventName))
+
+		err = s.sdk.InstallChart(project.Name, chart, func(format string, v ...interface{}) {
+			message := fmt.Sprintf(format, v...)
+			w.Event(progress.NewEvent(eventName, progress.Done, message))
+		})
+
+	} else {
+		//update stack
+		eventName = "Updating Compose stack"
+		w.Event(progress.CreatingEvent(eventName))
+
+		err = s.sdk.UpdateChart(project.Name, chart, func(format string, v ...interface{}) {
+			message := fmt.Sprintf(format, v...)
+			w.Event(progress.NewEvent(eventName, progress.Done, message))
+		})
+	}
 	if err != nil {
 		return err
 	}
+
 	w.Event(progress.NewEvent(eventName, progress.Done, ""))
 
 	return s.client.WaitForPodState(ctx, client.WaitForStatusOptions{
@@ -266,7 +281,7 @@ func (s *composeService) Remove(ctx context.Context, project *types.Project, opt
 
 // Exec executes a command in a running service container
 func (s *composeService) Exec(ctx context.Context, project *types.Project, opts compose.RunOptions) (int, error) {
-	return 0, errdefs.ErrNotImplemented
+	return 0, s.client.Exec(ctx, project.Name, opts)
 }
 
 func (s *composeService) Pause(ctx context.Context, project string, options compose.PauseOptions) error {

+ 13 - 0
kube/helm/helm.go

@@ -84,6 +84,19 @@ func (hc *Actions) InstallChart(name string, chart *chart.Chart, logger func(for
 	return err
 }
 
+// UpdateChart upgrades chart
+func (hc *Actions) UpdateChart(name string, chart *chart.Chart, logger func(format string, v ...interface{})) error {
+	err := hc.initialize(logger)
+	if err != nil {
+		return err
+	}
+
+	actUpgrade := action.NewUpgrade(hc.Config)
+	actUpgrade.Namespace = hc.Namespace
+	_, err = actUpgrade.Run(name, chart, map[string]interface{}{})
+	return err
+}
+
 // Uninstall uninstall chart
 func (hc *Actions) Uninstall(name string, logger func(format string, v ...interface{})) error {
 	err := hc.initialize(logger)

+ 2 - 2
kube/resources/kube.go

@@ -94,8 +94,8 @@ func mapToService(project *types.Project, service types.ServiceConfig) *core.Ser
 		}
 		ports = append(ports,
 			core.ServicePort{
-				Name:       fmt.Sprintf("%d-%s", p.Target, strings.ToLower(p.Protocol)),
-				Port:       int32(p.Target),
+				Name:       fmt.Sprintf("%d-%s", p.Published, strings.ToLower(p.Protocol)),
+				Port:       int32(p.Published),
 				TargetPort: intstr.FromInt(int(p.Target)),
 				Protocol:   toProtocol(p.Protocol),
 			})