Explorar o código

Merge pull request #1934 from ndeloof/control_char_run

Nicolas De loof %!s(int64=4) %!d(string=hai) anos
pai
achega
1b76c746fe
Modificáronse 3 ficheiros con 113 adicións e 56 borrados
  1. 14 15
      pkg/compose/attach.go
  2. 2 2
      pkg/compose/exec.go
  3. 97 39
      pkg/compose/run.go

+ 14 - 15
pkg/compose/attach.go

@@ -24,11 +24,11 @@ import (
 
 	"github.com/compose-spec/compose-go/types"
 	"github.com/docker/cli/cli/streams"
-	"github.com/docker/compose-cli/pkg/api"
 	moby "github.com/docker/docker/api/types"
 	"github.com/docker/docker/pkg/stdcopy"
 	"github.com/moby/term"
 
+	"github.com/docker/compose-cli/pkg/api"
 	"github.com/docker/compose-cli/pkg/utils"
 )
 
@@ -85,11 +85,19 @@ func (s *composeService) attachContainer(ctx context.Context, container moby.Con
 func (s *composeService) attachContainerStreams(ctx context.Context, container string, tty bool, stdin io.ReadCloser, stdout, stderr io.Writer) (func(), chan bool, error) {
 	detached := make(chan bool)
 	var (
-		in      *streams.In
 		restore = func() { /* noop */ }
 	)
 	if stdin != nil {
-		in = streams.NewIn(stdin)
+		in := streams.NewIn(stdin)
+		if in.IsTerminal() {
+			state, err := term.SetRawTerminal(in.FD())
+			if err != nil {
+				return restore, detached, err
+			}
+			restore = func() {
+				term.RestoreTerminal(in.FD(), state) //nolint:errcheck
+			}
+		}
 	}
 
 	streamIn, streamOut, err := s.getContainerStreams(ctx, container)
@@ -99,22 +107,13 @@ func (s *composeService) attachContainerStreams(ctx context.Context, container s
 
 	go func() {
 		<-ctx.Done()
-		if in != nil {
-			in.Close() //nolint:errcheck
+		if stdin != nil {
+			stdin.Close() //nolint:errcheck
 		}
 		streamOut.Close() //nolint:errcheck
 	}()
 
-	if in != nil && streamIn != nil {
-		if in.IsTerminal() {
-			state, err := term.SetRawTerminal(in.FD())
-			if err != nil {
-				return restore, detached, err
-			}
-			restore = func() {
-				term.RestoreTerminal(in.FD(), state) //nolint:errcheck
-			}
-		}
+	if streamIn != nil && stdin != nil {
 		go func() {
 			_, err := io.Copy(streamIn, stdin)
 			if _, ok := err.(term.EscapeError); ok {

+ 2 - 2
pkg/compose/exec.go

@@ -118,13 +118,13 @@ func (s *composeService) interactiveExec(ctx context.Context, opts api.RunOption
 			_, err := stdcopy.StdCopy(opts.Stdout, opts.Stderr, stdout)
 			outputDone <- err
 		}
-		defer stdout.Close() //nolint:errcheck
+		stdout.Close() //nolint:errcheck
 	}()
 
 	go func() {
 		_, err := io.Copy(stdin, r)
 		inputDone <- err
-		defer stdin.Close() //nolint:errcheck
+		stdin.Close() //nolint:errcheck
 	}()
 
 	for {

+ 97 - 39
pkg/compose/run.go

@@ -24,9 +24,10 @@ import (
 	"github.com/docker/compose-cli/pkg/api"
 
 	"github.com/compose-spec/compose-go/types"
+	"github.com/docker/cli/cli/streams"
 	moby "github.com/docker/docker/api/types"
-	"github.com/docker/docker/api/types/container"
 	"github.com/docker/docker/pkg/ioutils"
+	"github.com/docker/docker/pkg/stdcopy"
 	"github.com/docker/docker/pkg/stringid"
 	"github.com/moby/term"
 )
@@ -37,38 +38,11 @@ func (s *composeService) RunOneOffContainer(ctx context.Context, project *types.
 		return 0, err
 	}
 
-	service, err := project.GetService(opts.Service)
+	containerID, err := s.prepareRun(ctx, project, observedState, opts)
 	if err != nil {
 		return 0, err
 	}
 
-	applyRunOptions(project, &service, opts)
-
-	slug := stringid.GenerateRandomID()
-	if service.ContainerName == "" {
-		service.ContainerName = fmt.Sprintf("%s_%s_run_%s", project.Name, service.Name, stringid.TruncateID(slug))
-	}
-	service.Scale = 1
-	service.StdinOpen = true
-	service.Restart = ""
-	if service.Deploy != nil {
-		service.Deploy.RestartPolicy = nil
-	}
-	service.Labels = service.Labels.Add(api.SlugLabel, slug)
-	service.Labels = service.Labels.Add(api.OneoffLabel, "True")
-
-	if err := s.ensureImagesExists(ctx, project, observedState, false); err != nil { // all dependencies already checked, but might miss service img
-		return 0, err
-	}
-	if err := s.waitDependencies(ctx, project, service); err != nil {
-		return 0, err
-	}
-	created, err := s.createContainer(ctx, project, service, service.ContainerName, 1, opts.AutoRemove, opts.UseNetworkAliases)
-	if err != nil {
-		return 0, err
-	}
-	containerID := created.ID
-
 	if opts.Detach {
 		err := s.apiClient.ContainerStart(ctx, containerID, moby.ContainerStartOptions{})
 		if err != nil {
@@ -78,17 +52,48 @@ func (s *composeService) RunOneOffContainer(ctx context.Context, project *types.
 		return 0, nil
 	}
 
+	return s.runInteractive(ctx, containerID, opts)
+}
+
+func (s *composeService) runInteractive(ctx context.Context, containerID string, opts api.RunOptions) (int, error) {
 	r, err := s.getEscapeKeyProxy(opts.Stdin)
 	if err != nil {
 		return 0, err
 	}
-	restore, detachC, err := s.attachContainerStreams(ctx, containerID, service.Tty, r, opts.Stdout, opts.Stderr)
+
+	stdin, stdout, err := s.getContainerStreams(ctx, containerID)
 	if err != nil {
 		return 0, err
 	}
-	defer restore()
 
-	statusC, errC := s.apiClient.ContainerWait(context.Background(), containerID, container.WaitConditionNextExit)
+	in := streams.NewIn(opts.Stdin)
+	if in.IsTerminal() {
+		state, err := term.SetRawTerminal(in.FD())
+		if err != nil {
+			return 0, err
+		}
+		defer term.RestoreTerminal(in.FD(), state) //nolint:errcheck
+	}
+
+	outputDone := make(chan error)
+	inputDone := make(chan error)
+
+	go func() {
+		if opts.Tty {
+			_, err := io.Copy(opts.Stdout, stdout) //nolint:errcheck
+			outputDone <- err
+		} else {
+			_, err := stdcopy.StdCopy(opts.Stdout, opts.Stderr, stdout) //nolint:errcheck
+			outputDone <- err
+		}
+		stdout.Close() //nolint:errcheck
+	}()
+
+	go func() {
+		_, err := io.Copy(stdin, r)
+		inputDone <- err
+		stdin.Close() //nolint:errcheck
+	}()
 
 	err = s.apiClient.ContainerStart(ctx, containerID, moby.ContainerStartOptions{})
 	if err != nil {
@@ -97,15 +102,68 @@ func (s *composeService) RunOneOffContainer(ctx context.Context, project *types.
 
 	s.monitorTTySize(ctx, containerID, s.apiClient.ContainerResize)
 
-	select {
-	case status := <-statusC:
-		return int(status.StatusCode), nil
-	case <-detachC:
-		return 0, nil
-	case err := <-errC:
-		return 0, err
+	for {
+		select {
+		case err := <-outputDone:
+			if err != nil {
+				return 0, err
+			}
+			inspect, err := s.apiClient.ContainerInspect(ctx, containerID)
+			if err != nil {
+				return 0, err
+			}
+			exitCode := 0
+			if inspect.State != nil {
+				exitCode = inspect.State.ExitCode
+			}
+			return exitCode, nil
+		case err := <-inputDone:
+			if _, ok := err.(term.EscapeError); ok {
+				return 0, nil
+			}
+			if err != nil {
+				return 0, err
+			}
+			// Wait for output to complete streaming
+		case <-ctx.Done():
+			return 0, ctx.Err()
+		}
+	}
+}
+
+func (s *composeService) prepareRun(ctx context.Context, project *types.Project, observedState Containers, opts api.RunOptions) (string, error) {
+	service, err := project.GetService(opts.Service)
+	if err != nil {
+		return "", err
 	}
 
+	applyRunOptions(project, &service, opts)
+
+	slug := stringid.GenerateRandomID()
+	if service.ContainerName == "" {
+		service.ContainerName = fmt.Sprintf("%s_%s_run_%s", project.Name, service.Name, stringid.TruncateID(slug))
+	}
+	service.Scale = 1
+	service.StdinOpen = true
+	service.Restart = ""
+	if service.Deploy != nil {
+		service.Deploy.RestartPolicy = nil
+	}
+	service.Labels = service.Labels.Add(api.SlugLabel, slug)
+	service.Labels = service.Labels.Add(api.OneoffLabel, "True")
+
+	if err := s.ensureImagesExists(ctx, project, observedState, false); err != nil { // all dependencies already checked, but might miss service img
+		return "", err
+	}
+	if err := s.waitDependencies(ctx, project, service); err != nil {
+		return "", err
+	}
+	created, err := s.createContainer(ctx, project, service, service.ContainerName, 1, opts.AutoRemove, opts.UseNetworkAliases)
+	if err != nil {
+		return "", err
+	}
+	containerID := created.ID
+	return containerID, nil
 }
 
 func (s *composeService) getEscapeKeyProxy(r io.ReadCloser) (io.ReadCloser, error) {