Browse Source

Implement exec command

Djordje Lukic 5 years ago
parent
commit
afca3e31b5
8 changed files with 156 additions and 63 deletions
  1. 48 59
      azure/aci.go
  2. 16 0
      azure/backend.go
  3. 62 0
      cli/cmd/exec.go
  4. 2 0
      cli/main.go
  5. 18 4
      containers/api.go
  6. 6 0
      example/backend.go
  7. 1 0
      go.mod
  8. 3 0
      go.sum

+ 48 - 59
azure/aci.go

@@ -1,15 +1,12 @@
 package azure
 
 import (
-	"bufio"
 	"context"
 	"fmt"
 	"io"
+	"io/ioutil"
 	"net/http"
 	"os"
-	"os/signal"
-	"runtime"
-	"strings"
 
 	"github.com/docker/api/context/store"
 
@@ -81,18 +78,17 @@ func createACIContainers(ctx context.Context, aciContext store.AciContext, group
 
 		containers := *containerGroup.Containers
 		container := containers[0]
-		response, err := execACIContainer(ctx, "/bin/sh", *containerGroup.Name, *container.Name, aciContext)
+		response, err := execACIContainer(ctx, aciContext, "/bin/sh", *containerGroup.Name, *container.Name)
 		if err != nil {
 			return c, err
 		}
 
-		err = execWebSocketLoopWithCmd(
+		if err = execCommands(
 			ctx,
 			*response.WebSocketURI,
 			*response.Password,
 			commands,
-			false)
-		if err != nil {
+		); err != nil {
 			return containerinstance.ContainerGroup{}, err
 		}
 	}
@@ -122,7 +118,7 @@ func listACIContainers(aciContext store.AciContext) (c []containerinstance.Conta
 	return containers, err
 }
 
-func execACIContainer(ctx context.Context, command, containerGroup string, containerName string, aciContext store.AciContext) (c containerinstance.ContainerExecResponse, err error) {
+func execACIContainer(ctx context.Context, aciContext store.AciContext, command, containerGroup string, containerName string) (c containerinstance.ContainerExecResponse, err error) {
 	containerClient := getContainerClient(aciContext.SubscriptionID)
 	rows, cols := getTermSize()
 	containerExecRequest := containerinstance.ContainerExecRequest{
@@ -132,6 +128,7 @@ func execACIContainer(ctx context.Context, command, containerGroup string, conta
 			Cols: cols,
 		},
 	}
+
 	return containerClient.ExecuteCommand(
 		ctx,
 		aciContext.ResourceGroup,
@@ -146,93 +143,85 @@ func getTermSize() (*int32, *int32) {
 	return to.Int32Ptr(int32(rows)), to.Int32Ptr(int32(cols))
 }
 
-func execWebSocketLoop(ctx context.Context, wsURL, passwd string) error {
-	return execWebSocketLoopWithCmd(ctx, wsURL, passwd, []string{}, true)
+type commandSender struct {
+	commands []string
 }
 
-func execWebSocketLoopWithCmd(ctx context.Context, wsURL, passwd string, commands []string, outputEnabled bool) error {
+func (cs commandSender) Read(p []byte) (int, error) {
+	if len(cs.commands) == 0 {
+		return 0, io.EOF
+	}
+	command := cs.commands[0]
+	cs.commands = cs.commands[1:]
+	copy(p, command)
+	return len(command), nil
+}
+
+func execCommands(ctx context.Context, address string, password string, commands []string) error {
+	writer := ioutil.Discard
+	reader := commandSender{
+		commands: commands,
+	}
+	return exec(ctx, address, password, reader, writer)
+}
+
+func exec(ctx context.Context, address string, password string, reader io.Reader, writer io.Writer) error {
 	ctx, cancel := context.WithCancel(ctx)
-	conn, _, _, err := ws.DefaultDialer.Dial(ctx, wsURL)
+	conn, _, _, err := ws.DefaultDialer.Dial(ctx, address)
 	if err != nil {
 		cancel()
 		return err
 	}
-	err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(passwd))
+	err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(password))
 	if err != nil {
 		cancel()
 		return err
 	}
-	lastCommandLen := 0
+
 	done := make(chan struct{})
+
 	go func() {
 		defer close(done)
 		for {
 			msg, _, err := wsutil.ReadServerData(conn)
 			if err != nil {
-				if err != io.EOF {
-					fmt.Printf("read error: %s\n", err)
-				}
 				return
 			}
-			lines := strings.Split(string(msg), "\n")
-			lastCommandLen = len(lines[len(lines)-1])
-			if outputEnabled {
-				fmt.Printf("%s", msg)
-			}
+			fmt.Fprint(writer, string(msg))
 		}
 	}()
-	interrupt := make(chan os.Signal, 1)
-	signal.Notify(interrupt, os.Interrupt)
-	scanner := bufio.NewScanner(os.Stdin)
-	rc := make(chan string, 10)
-	if len(commands) > 0 {
-		for _, command := range commands {
-			rc <- command
-		}
-	}
+
+	readChannel := make(chan []byte, 10)
+
 	go func() {
 		for {
-			if !scanner.Scan() {
+			// We send each byte, byte-per-byte over the
+			// websocket because the console is in raw mode
+			buffer := make([]byte, 1)
+			n, err := reader.Read(buffer)
+			if err != nil {
 				close(done)
 				cancel()
-				fmt.Println("exiting...")
 				break
 			}
-			t := scanner.Text()
-			rc <- t
-			cleanLastCommand(lastCommandLen)
+
+			if n > 0 {
+				readChannel <- buffer
+			}
 		}
 	}()
+
 	for {
 		select {
 		case <-done:
 			return nil
-		case line := <-rc:
-			err = wsutil.WriteClientMessage(conn, ws.OpText, []byte(line+"\n"))
+		case bytes := <-readChannel:
+			err := wsutil.WriteClientMessage(conn, ws.OpText, bytes)
 			if err != nil {
-				fmt.Println("write: ", err)
-				return nil
+				return err
 			}
-		case <-interrupt:
-			fmt.Println("interrupted...")
-			close(done)
-			cancel()
-			return nil
-		}
-	}
-}
-
-func cleanLastCommand(lastCommandLen int) {
-	tm.MoveCursorUp(1)
-	tm.MoveCursorForward(lastCommandLen)
-	if runtime.GOOS != "windows" {
-		for i := 0; i < tm.Width(); i++ {
-			_, _ = tm.Print(" ")
 		}
-		tm.MoveCursorUp(1)
 	}
-
-	tm.Flush()
 }
 
 func getContainerGroupsClient(subscriptionID string) (containerinstance.ContainerGroupsClient, error) {

+ 16 - 0
azure/backend.go

@@ -2,6 +2,7 @@ package azure
 
 import (
 	"context"
+	"io"
 
 	"github.com/Azure/azure-sdk-for-go/services/containerinstance/mgmt/2018-10-01/containerinstance"
 	"github.com/Azure/go-autorest/autorest/azure/auth"
@@ -121,3 +122,18 @@ func (cs *containerService) Run(ctx context.Context, r containers.ContainerConfi
 	_, err = createACIContainers(ctx, cs.ctx, groupDefinition)
 	return err
 }
+
+func (cs *containerService) Exec(ctx context.Context, name string, command string, reader io.Reader, writer io.Writer) error {
+	containerExecResponse, err := execACIContainer(ctx, cs.ctx, command, name, name)
+	if err != nil {
+		return err
+	}
+
+	return exec(
+		context.Background(),
+		*containerExecResponse.WebSocketURI,
+		*containerExecResponse.Password,
+		reader,
+		writer,
+	)
+}

+ 62 - 0
cli/cmd/exec.go

@@ -0,0 +1,62 @@
+package cmd
+
+import (
+	"context"
+	"io"
+	"os"
+	"strings"
+
+	"github.com/containerd/console"
+	"github.com/pkg/errors"
+	"github.com/spf13/cobra"
+
+	"github.com/docker/api/client"
+)
+
+type execOpts struct {
+	Tty bool
+}
+
+func ExecCommand() *cobra.Command {
+	var opts execOpts
+	cmd := &cobra.Command{
+		Use:   "exec",
+		Short: "Run a command in a running container",
+		Args:  cobra.MinimumNArgs(2),
+		RunE: func(cmd *cobra.Command, args []string) error {
+			return runExec(cmd.Context(), opts, args[0], strings.Join(args[1:], " "))
+		},
+	}
+
+	cmd.Flags().BoolVarP(&opts.Tty, "tty", "t", false, "Allocate a pseudo-TTY")
+
+	return cmd
+}
+
+func runExec(ctx context.Context, opts execOpts, name string, command string) error {
+	c, err := client.New(ctx)
+	if err != nil {
+		return errors.Wrap(err, "cannot connect to backend")
+	}
+
+	var (
+		con    console.Console
+		stdout io.Writer
+	)
+
+	stdout = os.Stdout
+
+	if opts.Tty {
+		con = console.Current()
+		if err := con.SetRaw(); err != nil {
+			return err
+		}
+		defer func() {
+			con.Reset()
+		}()
+
+		stdout = con
+	}
+
+	return c.ContainerService().Exec(ctx, name, command, os.Stdin, stdout)
+}

+ 2 - 0
cli/main.go

@@ -83,6 +83,7 @@ func main() {
 		Use:           "docker",
 		Long:          "docker for the 2020s",
 		SilenceErrors: true,
+		SilenceUsage:  true,
 		PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
 			if !isOwnCommand(cmd) {
 				execMoby(cmd.Context())
@@ -100,6 +101,7 @@ func main() {
 		cmd.ServeCommand(),
 		&cmd.ExampleCommand,
 		run.Command(),
+		cmd.ExecCommand(),
 	)
 
 	helpFunc := root.HelpFunc()

+ 18 - 4
containers/api.go

@@ -2,8 +2,10 @@ package containers
 
 import (
 	"context"
+	"io"
 )
 
+// Container represents a created container
 type Container struct {
 	ID          string
 	Status      string
@@ -17,18 +19,30 @@ type Container struct {
 	Labels      []string
 }
 
+// Port represents a published port of a container
 type Port struct {
-	Source      uint32
+	// Source is the source port
+	Source uint32
+	// Destination is the destination port
 	Destination uint32
 }
 
+// ContainerConfig contains the configuration data about a container
 type ContainerConfig struct {
-	ID    string
+	// ID uniquely identifies the container
+	ID string
+	// Image specifies the iamge reference used for a container
 	Image string
+	// Ports provide a list of published ports
 	Ports []Port
 }
 
+// ContainerService interacts with the underlying container backend
 type ContainerService interface {
-	List(context.Context) ([]Container, error)
-	Run(context.Context, ContainerConfig) error
+	// List returns all the containers
+	List(ctx context.Context) ([]Container, error)
+	// Run creates and starts a container
+	Run(ctx context.Context, config ContainerConfig) error
+	// Exec executes a command inside a running container
+	Exec(ctx context.Context, containerName string, command string, reader io.Reader, writer io.Writer) error
 }

+ 6 - 0
example/backend.go

@@ -3,6 +3,7 @@ package example
 import (
 	"context"
 	"fmt"
+	"io"
 
 	"github.com/docker/api/backend"
 	"github.com/docker/api/containers"
@@ -37,3 +38,8 @@ func (cs *containerService) Run(ctx context.Context, r containers.ContainerConfi
 	fmt.Printf("Running container %q with name %q\n", r.Image, r.ID)
 	return nil
 }
+
+func (cs *containerService) Exec(ctx context.Context, name string, command string, reader io.Reader, writer io.Writer) error {
+	fmt.Printf("Executing command %q on container %q", command, name)
+	return nil
+}

+ 1 - 0
go.mod

@@ -10,6 +10,7 @@ require (
 	github.com/Azure/go-autorest/autorest/validation v0.2.0 // indirect
 	github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
 	github.com/compose-spec/compose-go v0.0.0-20200423124427-63dcf8c22cae
+	github.com/containerd/console v1.0.0
 	github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect
 	github.com/gobwas/pool v0.2.0 // indirect
 	github.com/gobwas/ws v1.0.3

+ 3 - 0
go.sum

@@ -53,6 +53,8 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk
 github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
 github.com/compose-spec/compose-go v0.0.0-20200423124427-63dcf8c22cae h1:5zRbbF5Gbkl7ZEJrKwYha2JMWgnfpPjSmv8+jCmkeSA=
 github.com/compose-spec/compose-go v0.0.0-20200423124427-63dcf8c22cae/go.mod h1:1PUpzRF1O/65VOqXZuwpCuYY7pJxbIq1jbAvAf62FGM=
+github.com/containerd/console v1.0.0 h1:fU3UuQapBs+zLJu82NhR11Rif1ny2zfMMAyPJzSN5tQ=
+github.com/containerd/console v1.0.0/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
 github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
 github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
 github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
@@ -252,6 +254,7 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd h1:xhmwyvizuTgC2qz7ZlMluP20uW+C3Rm0FD/WLDX8884=
 golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=