فهرست منبع

Merge pull request #1245 from aiordache/kube_logs_cmd

Kube backend: Implement `compose logs`
Guillaume Tardif 4 سال پیش
والد
کامیت
ff5e3a5e92
6فایلهای تغییر یافته به همراه122 افزوده شده و 33 حذف شده
  1. 33 4
      kube/client/client.go
  2. 5 1
      kube/compose.go
  3. 5 0
      kube/e2e/compose_test.go
  4. 2 1
      local/compose/attach.go
  5. 2 27
      local/compose/logs.go
  6. 75 0
      utils/logconsumer.go

+ 33 - 4
kube/client/client.go

@@ -21,13 +21,15 @@ package client
 import (
 import (
 	"context"
 	"context"
 	"fmt"
 	"fmt"
+	"io"
 
 
-	v1 "k8s.io/api/core/v1"
+	"github.com/docker/compose-cli/api/compose"
+	"github.com/docker/compose-cli/utils"
+	"golang.org/x/sync/errgroup"
+	corev1 "k8s.io/api/core/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	"k8s.io/cli-runtime/pkg/genericclioptions"
 	"k8s.io/cli-runtime/pkg/genericclioptions"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/kubernetes"
-
-	"github.com/docker/compose-cli/api/compose"
 )
 )
 
 
 // KubeClient API to access kube objects
 // KubeClient API to access kube objects
@@ -81,7 +83,7 @@ func (kc KubeClient) GetContainers(ctx context.Context, projectName string, all
 	return result, nil
 	return result, nil
 }
 }
 
 
-func podToContainerSummary(pod v1.Pod) compose.ContainerSummary {
+func podToContainerSummary(pod corev1.Pod) compose.ContainerSummary {
 	return compose.ContainerSummary{
 	return compose.ContainerSummary{
 		ID:      pod.GetObjectMeta().GetName(),
 		ID:      pod.GetObjectMeta().GetName(),
 		Name:    pod.GetObjectMeta().GetName(),
 		Name:    pod.GetObjectMeta().GetName(),
@@ -90,3 +92,30 @@ func podToContainerSummary(pod v1.Pod) compose.ContainerSummary {
 		Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag],
 		Project: pod.GetObjectMeta().GetLabels()[compose.ProjectTag],
 	}
 	}
 }
 }
+
+// GetLogs retrieves pod logs
+func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer compose.LogConsumer, follow bool) error {
+	pods, err := kc.client.CoreV1().Pods(kc.namespace).List(ctx, metav1.ListOptions{
+		LabelSelector: fmt.Sprintf("%s=%s", compose.ProjectTag, projectName),
+	})
+	if err != nil {
+		return err
+	}
+	eg, ctx := errgroup.WithContext(ctx)
+	for _, pod := range pods.Items {
+		request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
+		service := pod.Labels[compose.ServiceTag]
+		w := utils.GetWriter(service, pod.Name, consumer)
+
+		eg.Go(func() error {
+			r, err := request.Stream(ctx)
+			defer r.Close() // nolint errcheck
+			if err != nil {
+				return err
+			}
+			_, err = io.Copy(w, r)
+			return err
+		})
+	}
+	return eg.Wait()
+}

+ 5 - 1
kube/compose.go

@@ -33,6 +33,7 @@ import (
 	"github.com/docker/compose-cli/kube/client"
 	"github.com/docker/compose-cli/kube/client"
 	"github.com/docker/compose-cli/kube/helm"
 	"github.com/docker/compose-cli/kube/helm"
 	"github.com/docker/compose-cli/kube/resources"
 	"github.com/docker/compose-cli/kube/resources"
+	"github.com/docker/compose-cli/utils"
 )
 )
 
 
 type composeService struct {
 type composeService struct {
@@ -154,7 +155,10 @@ func (s *composeService) Stop(ctx context.Context, project *types.Project) error
 
 
 // Logs executes the equivalent to a `compose logs`
 // Logs executes the equivalent to a `compose logs`
 func (s *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer, options compose.LogOptions) error {
 func (s *composeService) Logs(ctx context.Context, projectName string, consumer compose.LogConsumer, options compose.LogOptions) error {
-	return errdefs.ErrNotImplemented
+	if len(options.Services) > 0 {
+		consumer = utils.FilteredLogConsumer(consumer, options.Services)
+	}
+	return s.client.GetLogs(ctx, projectName, consumer, options.Follow)
 }
 }
 
 
 // Ps executes the equivalent to a `compose ps`
 // Ps executes the equivalent to a `compose ps`

+ 5 - 0
kube/e2e/compose_test.go

@@ -108,6 +108,11 @@ func TestComposeUp(t *testing.T) {
 		c.WaitForCmdResult(icmd.Command("docker", "--context", "default", "exec", "e2e-control-plane", "curl", endpoint), StdoutContains(`"word":`), 3*time.Minute, 3*time.Second)
 		c.WaitForCmdResult(icmd.Command("docker", "--context", "default", "exec", "e2e-control-plane", "curl", endpoint), StdoutContains(`"word":`), 3*time.Minute, 3*time.Second)
 	})
 	})
 
 
+	t.Run("compose logs web", func(t *testing.T) {
+		res := c.RunDockerCmd("compose", "--project-name", projectName, "logs", "web")
+		assert.Assert(t, strings.Contains(res.Stdout(), "Listening on port 80"), res.Stdout())
+	})
+
 	t.Run("down", func(t *testing.T) {
 	t.Run("down", func(t *testing.T) {
 		_ = c.RunDockerCmd("compose", "--project-name", projectName, "down")
 		_ = c.RunDockerCmd("compose", "--project-name", projectName, "down")
 	})
 	})

+ 2 - 1
local/compose/attach.go

@@ -24,6 +24,7 @@ import (
 
 
 	"github.com/docker/compose-cli/api/compose"
 	"github.com/docker/compose-cli/api/compose"
 	convert "github.com/docker/compose-cli/local/moby"
 	convert "github.com/docker/compose-cli/local/moby"
+	"github.com/docker/compose-cli/utils"
 
 
 	"github.com/compose-spec/compose-go/types"
 	"github.com/compose-spec/compose-go/types"
 	moby "github.com/docker/docker/api/types"
 	moby "github.com/docker/docker/api/types"
@@ -62,7 +63,7 @@ func (s *composeService) attach(ctx context.Context, project *types.Project, con
 
 
 func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error {
 func (s *composeService) attachContainer(ctx context.Context, container moby.Container, consumer compose.LogConsumer, project *types.Project) error {
 	serviceName := container.Labels[serviceLabel]
 	serviceName := container.Labels[serviceLabel]
-	w := getWriter(serviceName, getCanonicalContainerName(container), consumer)
+	w := utils.GetWriter(serviceName, getCanonicalContainerName(container), consumer)
 
 
 	service, err := project.GetService(serviceName)
 	service, err := project.GetService(serviceName)
 	if err != nil {
 	if err != nil {

+ 2 - 27
local/compose/logs.go

@@ -17,11 +17,11 @@
 package compose
 package compose
 
 
 import (
 import (
-	"bytes"
 	"context"
 	"context"
 	"io"
 	"io"
 
 
 	"github.com/docker/compose-cli/api/compose"
 	"github.com/docker/compose-cli/api/compose"
+	"github.com/docker/compose-cli/utils"
 
 
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types"
 	"github.com/docker/docker/api/types/filters"
 	"github.com/docker/docker/api/types/filters"
@@ -73,7 +73,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			w := getWriter(service, container.Name[1:], consumer)
+			w := utils.GetWriter(service, container.Name[1:], consumer)
 			if container.Config.Tty {
 			if container.Config.Tty {
 				_, err = io.Copy(w, r)
 				_, err = io.Copy(w, r)
 			} else {
 			} else {
@@ -84,28 +84,3 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 	}
 	}
 	return eg.Wait()
 	return eg.Wait()
 }
 }
-
-type splitBuffer struct {
-	service   string
-	container string
-	consumer  compose.LogConsumer
-}
-
-// getWriter creates a io.Writer that will actually split by line and format by LogConsumer
-func getWriter(service, container string, l compose.LogConsumer) io.Writer {
-	return splitBuffer{
-		service:   service,
-		container: container,
-		consumer:  l,
-	}
-}
-
-func (s splitBuffer) Write(b []byte) (n int, err error) {
-	split := bytes.Split(b, []byte{'\n'})
-	for _, line := range split {
-		if len(line) != 0 {
-			s.consumer.Log(s.service, s.container, string(line))
-		}
-	}
-	return len(b), nil
-}

+ 75 - 0
utils/logconsumer.go

@@ -0,0 +1,75 @@
+/*
+   Copyright 2020 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package utils
+
+import (
+	"bytes"
+	"io"
+
+	"github.com/docker/compose-cli/api/compose"
+)
+
+// GetWriter creates a io.Writer that will actually split by line and format by LogConsumer
+func GetWriter(service, container string, l compose.LogConsumer) io.Writer {
+	return splitBuffer{
+		service:   service,
+		container: container,
+		consumer:  l,
+	}
+}
+
+// FilteredLogConsumer filters logs for given services
+func FilteredLogConsumer(consumer compose.LogConsumer, services []string) compose.LogConsumer {
+	if len(services) == 0 {
+		return consumer
+	}
+	allowed := map[string]bool{}
+	for _, s := range services {
+		allowed[s] = true
+	}
+	return &allowListLogConsumer{
+		allowList: allowed,
+		delegate:  consumer,
+	}
+}
+
+type allowListLogConsumer struct {
+	allowList map[string]bool
+	delegate  compose.LogConsumer
+}
+
+func (a *allowListLogConsumer) Log(service, container, message string) {
+	if a.allowList[service] {
+		a.delegate.Log(service, container, message)
+	}
+}
+
+type splitBuffer struct {
+	service   string
+	container string
+	consumer  compose.LogConsumer
+}
+
+func (s splitBuffer) Write(b []byte) (n int, err error) {
+	split := bytes.Split(b, []byte{'\n'})
+	for _, line := range split {
+		if len(line) != 0 {
+			s.consumer.Log(s.service, s.container, string(line))
+		}
+	}
+	return len(b), nil
+}