浏览代码

pass service to LogConsumer

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 4 年之前
父节点
当前提交
2b0e0f2741
共有 9 个文件被更改,包括 14 次插入14 次删除
  1. 1 1
      api/compose/api.go
  2. 1 1
      cli/cmd/compose/up.go
  3. 3 3
      cli/formatter/logs.go
  4. 1 1
      ecs/aws.go
  5. 1 1
      ecs/aws_mock.go
  6. 2 2
      ecs/sdk.go
  7. 1 1
      kube/client/client.go
  8. 1 1
      local/compose/logs.go
  9. 3 3
      utils/logs.go

+ 1 - 1
api/compose/api.go

@@ -218,7 +218,7 @@ type Stack struct {
 
 
 // LogConsumer is a callback to process log messages from services
 // LogConsumer is a callback to process log messages from services
 type LogConsumer interface {
 type LogConsumer interface {
-	Log(name, container, message string)
+	Log(name, service, container, message string)
 	Status(name, container, msg string)
 	Status(name, container, msg string)
 	Register(name string, source string)
 	Register(name string, source string)
 }
 }

+ 1 - 1
cli/cmd/compose/up.go

@@ -347,7 +347,7 @@ func (p printer) run(ctx context.Context, cascadeStop bool, exitCodeFrom string,
 			}
 			}
 		case compose.ContainerEventLog:
 		case compose.ContainerEventLog:
 			if !aborting {
 			if !aborting {
-				consumer.Log(event.Name, event.Source, event.Line)
+				consumer.Log(event.Name, event.Service, event.Source, event.Line)
 			}
 			}
 		}
 		}
 	}
 	}

+ 3 - 3
cli/formatter/logs.go

@@ -62,13 +62,13 @@ func (l *logConsumer) register(name string, id string) *presenter {
 }
 }
 
 
 // Log formats a log message as received from name/container
 // Log formats a log message as received from name/container
-func (l *logConsumer) Log(name, id, message string) {
+func (l *logConsumer) Log(name, service, container, message string) {
 	if l.ctx.Err() != nil {
 	if l.ctx.Err() != nil {
 		return
 		return
 	}
 	}
-	p, ok := l.presenters[id]
+	p, ok := l.presenters[container]
 	if !ok { // should have been registered, but ¯\_(ツ)_/¯
 	if !ok { // should have been registered, but ¯\_(ツ)_/¯
-		p = l.register(name, id)
+		p = l.register(name, container)
 	}
 	}
 	for _, line := range strings.Split(message, "\n") {
 	for _, line := range strings.Split(message, "\n") {
 		fmt.Fprintf(l.writer, "%s %s\n", p.prefix, line) // nolint:errcheck
 		fmt.Fprintf(l.writer, "%s %s\n", p.prefix, line) // nolint:errcheck

+ 1 - 1
ecs/aws.go

@@ -63,7 +63,7 @@ type API interface {
 	InspectSecret(ctx context.Context, id string) (secrets.Secret, error)
 	InspectSecret(ctx context.Context, id string) (secrets.Secret, error)
 	ListSecrets(ctx context.Context) ([]secrets.Secret, error)
 	ListSecrets(ctx context.Context) ([]secrets.Secret, error)
 	DeleteSecret(ctx context.Context, id string, recover bool) error
 	DeleteSecret(ctx context.Context, id string, recover bool) error
-	GetLogs(ctx context.Context, name string, consumer func(service string, container string, message string), follow bool) error
+	GetLogs(ctx context.Context, name string, consumer func(name string, service string, container string, message string), follow bool) error
 	DescribeService(ctx context.Context, cluster string, arn string) (compose.ServiceStatus, error)
 	DescribeService(ctx context.Context, cluster string, arn string) (compose.ServiceStatus, error)
 	DescribeServiceTasks(ctx context.Context, cluster string, project string, service string) ([]compose.ContainerSummary, error)
 	DescribeServiceTasks(ctx context.Context, cluster string, project string, service string) ([]compose.ContainerSummary, error)
 	getURLWithPortMapping(ctx context.Context, targetGroupArns []string) ([]compose.PortPublisher, error)
 	getURLWithPortMapping(ctx context.Context, targetGroupArns []string) ([]compose.PortPublisher, error)

+ 1 - 1
ecs/aws_mock.go

@@ -285,7 +285,7 @@ func (mr *MockAPIMockRecorder) GetLoadBalancerURL(arg0, arg1 interface{}) *gomoc
 }
 }
 
 
 // GetLogs mocks base method
 // GetLogs mocks base method
-func (m *MockAPI) GetLogs(arg0 context.Context, arg1 string, arg2 func(string, string, string), arg3 bool) error {
+func (m *MockAPI) GetLogs(arg0 context.Context, arg1 string, arg2 func(string, string, string, string), arg3 bool) error {
 	m.ctrl.T.Helper()
 	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "GetLogs", arg0, arg1, arg2, arg3)
 	ret := m.ctrl.Call(m, "GetLogs", arg0, arg1, arg2, arg3)
 	ret0, _ := ret[0].(error)
 	ret0, _ := ret[0].(error)

+ 2 - 2
ecs/sdk.go

@@ -805,7 +805,7 @@ func (s sdk) DeleteSecret(ctx context.Context, id string, recover bool) error {
 	return err
 	return err
 }
 }
 
 
-func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string, container string, message string), follow bool) error {
+func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string, service string, container string, message string), follow bool) error {
 	logGroup := fmt.Sprintf("/docker-compose/%s", name)
 	logGroup := fmt.Sprintf("/docker-compose/%s", name)
 	var startTime int64
 	var startTime int64
 	for {
 	for {
@@ -832,7 +832,7 @@ func (s sdk) GetLogs(ctx context.Context, name string, consumer func(name string
 
 
 				for _, event := range events.Events {
 				for _, event := range events.Events {
 					p := strings.Split(aws.StringValue(event.LogStreamName), "/")
 					p := strings.Split(aws.StringValue(event.LogStreamName), "/")
-					consumer(p[1], p[2], aws.StringValue(event.Message))
+					consumer(p[1], p[1], p[2], aws.StringValue(event.Message))
 					startTime = *event.IngestionTime
 					startTime = *event.IngestionTime
 				}
 				}
 			}
 			}

+ 1 - 1
kube/client/client.go

@@ -106,7 +106,7 @@ func (kc *KubeClient) GetLogs(ctx context.Context, projectName string, consumer
 		request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
 		request := kc.client.CoreV1().Pods(kc.namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Follow: follow})
 		service := pod.Labels[compose.ServiceTag]
 		service := pod.Labels[compose.ServiceTag]
 		w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
 		w := utils.GetWriter(pod.Name, service, string(pod.UID), func(event compose.ContainerEvent) {
-			consumer.Log(event.Name, event.Source, event.Line)
+			consumer.Log(event.Name, event.Service, event.Source, event.Line)
 		})
 		})
 
 
 		eg.Go(func() error {
 		eg.Go(func() error {

+ 1 - 1
local/compose/logs.go

@@ -77,7 +77,7 @@ func (s *composeService) Logs(ctx context.Context, projectName string, consumer
 			}
 			}
 			name := getContainerNameWithoutProject(c)
 			name := getContainerNameWithoutProject(c)
 			w := utils.GetWriter(name, service, c.ID, func(event compose.ContainerEvent) {
 			w := utils.GetWriter(name, service, c.ID, func(event compose.ContainerEvent) {
-				consumer.Log(name, event.Name, event.Line)
+				consumer.Log(name, event.Service, event.Name, event.Line)
 			})
 			})
 			if container.Config.Tty {
 			if container.Config.Tty {
 				_, err = io.Copy(w, r)
 				_, err = io.Copy(w, r)

+ 3 - 3
utils/logs.go

@@ -43,9 +43,9 @@ type allowListLogConsumer struct {
 	delegate  compose.LogConsumer
 	delegate  compose.LogConsumer
 }
 }
 
 
-func (a *allowListLogConsumer) Log(name, container, message string) {
-	if a.allowList[name] {
-		a.delegate.Log(name, container, message)
+func (a *allowListLogConsumer) Log(name, service, container, message string) {
+	if a.allowList[service] {
+		a.delegate.Log(name, service, container, message)
 	}
 	}
 }
 }