瀏覽代碼

Merge pull request #179 from rumpl/feat-stream

Feat stream
Djordje Lukic 5 年之前
父節點
當前提交
67fb49c98c

+ 2 - 1
Dockerfile

@@ -8,7 +8,8 @@ ENV GO111MODULE=on
 RUN apk add --no-cache \
     docker \
     make \
-    protoc
+    protoc \
+    protobuf-dev
 COPY go.* .
 RUN go mod download
 

+ 2 - 12
cli/cmd/exec.go

@@ -3,8 +3,6 @@ package cmd
 import (
 	"context"
 	"fmt"
-	"io"
-	"os"
 	"strings"
 
 	"github.com/containerd/console"
@@ -41,15 +39,9 @@ func runExec(ctx context.Context, opts execOpts, name string, command string) er
 		return errors.Wrap(err, "cannot connect to backend")
 	}
 
-	var (
-		con    console.Console
-		stdout io.Writer
-	)
-
-	stdout = os.Stdout
+	con := console.Current()
 
 	if opts.Tty {
-		con = console.Current()
 		if err := con.SetRaw(); err != nil {
 			return err
 		}
@@ -58,9 +50,7 @@ func runExec(ctx context.Context, opts execOpts, name string, command string) er
 				fmt.Println("Unable to close the console")
 			}
 		}()
-
-		stdout = con
 	}
 
-	return c.ContainerService().Exec(ctx, name, command, os.Stdin, stdout)
+	return c.ContainerService().Exec(ctx, name, command, con, con)
 }

+ 3 - 1
cli/cmd/serve.go

@@ -9,6 +9,7 @@ import (
 
 	containersv1 "github.com/docker/api/protos/containers/v1"
 	contextsv1 "github.com/docker/api/protos/contexts/v1"
+	streamsv1 "github.com/docker/api/protos/streams/v1"
 	"github.com/docker/api/server"
 	"github.com/docker/api/server/proxy"
 )
@@ -44,10 +45,11 @@ func runServe(ctx context.Context, opts serveOpts) error {
 	// nolint errcheck
 	defer listener.Close()
 
-	p := proxy.NewContainerAPI()
+	p := proxy.New()
 	contextsService := server.NewContexts()
 
 	containersv1.RegisterContainersServer(s, p)
+	streamsv1.RegisterStreamingServer(s, p)
 	contextsv1.RegisterContextsServer(s, contextsService)
 
 	go func() {

+ 1 - 1
go.mod

@@ -18,7 +18,7 @@ require (
 	github.com/buger/goterm v0.0.0-20200322175922-2f3e71b85129
 	github.com/compose-spec/compose-go v0.0.0-20200423124427-63dcf8c22cae
 	github.com/containerd/console v1.0.0
-	github.com/containerd/containerd v1.3.4 // indirect
+	github.com/containerd/containerd v1.3.4
 	github.com/containerd/continuity v0.0.0-20200413184840-d3ef23f19fbb // indirect
 	github.com/docker/cli v0.0.0-20200528204125-dd360c7c0de8
 	github.com/docker/distribution v2.7.1+incompatible // indirect

+ 5 - 2
moby/backend.go

@@ -138,18 +138,21 @@ func (ms *mobyService) Exec(ctx context.Context, name string, command string, re
 	cec, err := ms.apiClient.ContainerExecCreate(ctx, name, types.ExecConfig{
 		Cmd:          []string{command},
 		Tty:          true,
-		AttachStderr: true,
 		AttachStdin:  true,
 		AttachStdout: true,
+		AttachStderr: true,
 	})
 	if err != nil {
 		return err
 	}
-	resp, err := ms.apiClient.ContainerExecAttach(ctx, cec.ID, types.ExecStartCheck{})
+	resp, err := ms.apiClient.ContainerExecAttach(ctx, cec.ID, types.ExecStartCheck{
+		Tty: true,
+	})
 	if err != nil {
 		return err
 	}
 	defer resp.Close()
+
 	readChannel := make(chan error, 10)
 	writeChannel := make(chan error, 10)
 

文件差異過大導致無法顯示
+ 220 - 690
protos/containers/v1/containers.pb.go


+ 18 - 65
protos/containers/v1/containers.proto

@@ -33,14 +33,11 @@ option go_package = "github.com/docker/api/protos/containers/v1;v1";
 
 service Containers {
 	rpc List(ListRequest) returns (ListResponse);
-	rpc Create(CreateRequest) returns (CreateResponse);
-	rpc Start(StartRequest) returns (StartResponse);
 	rpc Stop(StopRequest) returns (StopResponse);
-	rpc Kill(KillRequest) returns (KillResponse);
-	rpc Delete(DeleteRequest) returns (DeleteResponse);
-	rpc Update(UpdateRequest) returns (UpdateResponse);
+	rpc Run(RunRequest) returns (RunResponse);
 	rpc Exec(ExecRequest) returns (ExecResponse);
 	rpc Logs(LogsRequest) returns (stream LogsResponse);
+	rpc Delete(DeleteRequest) returns (DeleteResponse);
 }
 
 message Port {
@@ -64,43 +61,6 @@ message Container {
 	repeated Port ports = 11;
 }
 
-message CreateRequest {
-	string id = 1;
-	repeated string args = 2;
-	repeated string env = 3;
-	bool tty = 4;
-	string image = 5;
-	string hostname = 6;
-	repeated string networks = 7;
-	string snapshotter = 8;
-	string logger = 9;
-	string profile = 10;
-	string restart_status = 11;
-	repeated Mount mounts = 12;
-	string working_dir = 13;
-	map<string, string> labels = 14;
-}
-
-message Mount {
-	string type = 1;
-	string source = 2;
-	string destination = 3;
-	repeated string options = 4;
-}
-
-message CreateResponse {
-
-}
-
-message UpdateRequest {
-	string id = 1;
-	string image = 2;
-}
-
-message UpdateResponse {
-
-}
-
 message DeleteRequest {
 	string id = 1;
 	bool force = 2;
@@ -109,15 +69,6 @@ message DeleteRequest {
 message DeleteResponse {
 }
 
-message StartRequest {
-	string id = 1;
-	string stream_id = 2;
-}
-
-message StartResponse {
-
-}
-
 message StopRequest {
 	string id = 1;
 	uint32 timeout = 2;
@@ -126,26 +77,28 @@ message StopRequest {
 message StopResponse {
 }
 
-
-message ExecRequest {
+message RunRequest {
 	string id = 1;
-	string stream_id = 2;
-	repeated string args = 3;
-	repeated string env = 4;
-	bool tty = 5;
-	string working_dir = 6;
+	string image = 2;
+	repeated Port ports = 3;
+	map<string, string> labels = 4;
+	repeated string volumes = 5;
 }
 
-message ExecResponse {
-
+message RunResponse {
 }
 
-message KillRequest {
+message ExecRequest {
 	string id = 1;
-	int64 signal = 2;
+	string command = 2;
+	string stream_id = 3;
+	repeated string args = 4;
+	repeated string env = 5;
+	bool tty = 6;
 }
 
-message KillResponse {
+message ExecResponse {
+	bytes output = 1;
 }
 
 message ListRequest {
@@ -158,9 +111,9 @@ message ListResponse {
 
 message LogsRequest {
 	string container_id = 1;
-	bool follow = 2;
+	bool follow = 3;
 }
 
 message LogsResponse {
-	bytes logs = 1;
+	bytes value = 1;
 }

+ 480 - 0
protos/streams/v1/streams.pb.go

@@ -0,0 +1,480 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// 	protoc-gen-go v1.22.0
+// 	protoc        v3.11.2
+// source: protos/streams/v1/streams.proto
+
+package v1
+
+import (
+	context "context"
+	proto "github.com/golang/protobuf/proto"
+	any "github.com/golang/protobuf/ptypes/any"
+	grpc "google.golang.org/grpc"
+	codes "google.golang.org/grpc/codes"
+	status "google.golang.org/grpc/status"
+	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+	reflect "reflect"
+	sync "sync"
+)
+
+const (
+	// Verify that this generated code is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+	// Verify that runtime/protoimpl is sufficiently up-to-date.
+	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+// This is a compile-time assertion that a sufficiently up-to-date version
+// of the legacy proto package is being used.
+const _ = proto.ProtoPackageIsVersion4
+
+type IOStream int32
+
+const (
+	IOStream_STDIN  IOStream = 0
+	IOStream_STDOUT IOStream = 1
+	IOStream_STDERR IOStream = 2
+)
+
+// Enum value maps for IOStream.
+var (
+	IOStream_name = map[int32]string{
+		0: "STDIN",
+		1: "STDOUT",
+		2: "STDERR",
+	}
+	IOStream_value = map[string]int32{
+		"STDIN":  0,
+		"STDOUT": 1,
+		"STDERR": 2,
+	}
+)
+
+func (x IOStream) Enum() *IOStream {
+	p := new(IOStream)
+	*p = x
+	return p
+}
+
+func (x IOStream) String() string {
+	return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (IOStream) Descriptor() protoreflect.EnumDescriptor {
+	return file_protos_streams_v1_streams_proto_enumTypes[0].Descriptor()
+}
+
+func (IOStream) Type() protoreflect.EnumType {
+	return &file_protos_streams_v1_streams_proto_enumTypes[0]
+}
+
+func (x IOStream) Number() protoreflect.EnumNumber {
+	return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use IOStream.Descriptor instead.
+func (IOStream) EnumDescriptor() ([]byte, []int) {
+	return file_protos_streams_v1_streams_proto_rawDescGZIP(), []int{0}
+}
+
+type BytesMessage struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Type  IOStream `protobuf:"varint,1,opt,name=type,proto3,enum=com.docker.api.protos.streams.v1.IOStream" json:"type,omitempty"`
+	Value []byte   `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
+}
+
+func (x *BytesMessage) Reset() {
+	*x = BytesMessage{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_protos_streams_v1_streams_proto_msgTypes[0]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *BytesMessage) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*BytesMessage) ProtoMessage() {}
+
+func (x *BytesMessage) ProtoReflect() protoreflect.Message {
+	mi := &file_protos_streams_v1_streams_proto_msgTypes[0]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use BytesMessage.ProtoReflect.Descriptor instead.
+func (*BytesMessage) Descriptor() ([]byte, []int) {
+	return file_protos_streams_v1_streams_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *BytesMessage) GetType() IOStream {
+	if x != nil {
+		return x.Type
+	}
+	return IOStream_STDIN
+}
+
+func (x *BytesMessage) GetValue() []byte {
+	if x != nil {
+		return x.Value
+	}
+	return nil
+}
+
+type ResizeMessage struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Width  uint32 `protobuf:"varint,1,opt,name=width,proto3" json:"width,omitempty"`
+	Height uint32 `protobuf:"varint,2,opt,name=height,proto3" json:"height,omitempty"`
+}
+
+func (x *ResizeMessage) Reset() {
+	*x = ResizeMessage{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_protos_streams_v1_streams_proto_msgTypes[1]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ResizeMessage) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ResizeMessage) ProtoMessage() {}
+
+func (x *ResizeMessage) ProtoReflect() protoreflect.Message {
+	mi := &file_protos_streams_v1_streams_proto_msgTypes[1]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ResizeMessage.ProtoReflect.Descriptor instead.
+func (*ResizeMessage) Descriptor() ([]byte, []int) {
+	return file_protos_streams_v1_streams_proto_rawDescGZIP(), []int{1}
+}
+
+func (x *ResizeMessage) GetWidth() uint32 {
+	if x != nil {
+		return x.Width
+	}
+	return 0
+}
+
+func (x *ResizeMessage) GetHeight() uint32 {
+	if x != nil {
+		return x.Height
+	}
+	return 0
+}
+
+type ExitMessage struct {
+	state         protoimpl.MessageState
+	sizeCache     protoimpl.SizeCache
+	unknownFields protoimpl.UnknownFields
+
+	Status uint32 `protobuf:"varint,1,opt,name=status,proto3" json:"status,omitempty"`
+}
+
+func (x *ExitMessage) Reset() {
+	*x = ExitMessage{}
+	if protoimpl.UnsafeEnabled {
+		mi := &file_protos_streams_v1_streams_proto_msgTypes[2]
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		ms.StoreMessageInfo(mi)
+	}
+}
+
+func (x *ExitMessage) String() string {
+	return protoimpl.X.MessageStringOf(x)
+}
+
+func (*ExitMessage) ProtoMessage() {}
+
+func (x *ExitMessage) ProtoReflect() protoreflect.Message {
+	mi := &file_protos_streams_v1_streams_proto_msgTypes[2]
+	if protoimpl.UnsafeEnabled && x != nil {
+		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+		if ms.LoadMessageInfo() == nil {
+			ms.StoreMessageInfo(mi)
+		}
+		return ms
+	}
+	return mi.MessageOf(x)
+}
+
+// Deprecated: Use ExitMessage.ProtoReflect.Descriptor instead.
+func (*ExitMessage) Descriptor() ([]byte, []int) {
+	return file_protos_streams_v1_streams_proto_rawDescGZIP(), []int{2}
+}
+
+func (x *ExitMessage) GetStatus() uint32 {
+	if x != nil {
+		return x.Status
+	}
+	return 0
+}
+
+var File_protos_streams_v1_streams_proto protoreflect.FileDescriptor
+
+var file_protos_streams_v1_streams_proto_rawDesc = []byte{
+	0x0a, 0x1f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73,
+	0x2f, 0x76, 0x31, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74,
+	0x6f, 0x12, 0x20, 0x63, 0x6f, 0x6d, 0x2e, 0x64, 0x6f, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x61, 0x70,
+	0x69, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73,
+	0x2e, 0x76, 0x31, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74,
+	0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x64,
+	0x0a, 0x0c, 0x42, 0x79, 0x74, 0x65, 0x73, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x3e,
+	0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x2a, 0x2e, 0x63,
+	0x6f, 0x6d, 0x2e, 0x64, 0x6f, 0x63, 0x6b, 0x65, 0x72, 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x70, 0x72,
+	0x6f, 0x74, 0x6f, 0x73, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2e, 0x76, 0x31, 0x2e,
+	0x49, 0x4f, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x14,
+	0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x76,
+	0x61, 0x6c, 0x75, 0x65, 0x22, 0x3d, 0x0a, 0x0d, 0x52, 0x65, 0x73, 0x69, 0x7a, 0x65, 0x4d, 0x65,
+	0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x77, 0x69, 0x64, 0x74, 0x68, 0x18, 0x01,
+	0x20, 0x01, 0x28, 0x0d, 0x52, 0x05, 0x77, 0x69, 0x64, 0x74, 0x68, 0x12, 0x16, 0x0a, 0x06, 0x68,
+	0x65, 0x69, 0x67, 0x68, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x06, 0x68, 0x65, 0x69,
+	0x67, 0x68, 0x74, 0x22, 0x25, 0x0a, 0x0b, 0x45, 0x78, 0x69, 0x74, 0x4d, 0x65, 0x73, 0x73, 0x61,
+	0x67, 0x65, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x01,
+	0x28, 0x0d, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x2a, 0x2d, 0x0a, 0x08, 0x49, 0x4f,
+	0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x09, 0x0a, 0x05, 0x53, 0x54, 0x44, 0x49, 0x4e, 0x10,
+	0x00, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x54, 0x44, 0x4f, 0x55, 0x54, 0x10, 0x01, 0x12, 0x0a, 0x0a,
+	0x06, 0x53, 0x54, 0x44, 0x45, 0x52, 0x52, 0x10, 0x02, 0x32, 0x48, 0x0a, 0x09, 0x53, 0x74, 0x72,
+	0x65, 0x61, 0x6d, 0x69, 0x6e, 0x67, 0x12, 0x3b, 0x0a, 0x09, 0x4e, 0x65, 0x77, 0x53, 0x74, 0x72,
+	0x65, 0x61, 0x6d, 0x12, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x1a, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67,
+	0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x28,
+	0x01, 0x30, 0x01, 0x42, 0x2c, 0x5a, 0x2a, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
+	0x6d, 0x2f, 0x64, 0x6f, 0x63, 0x6b, 0x65, 0x72, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x70, 0x72, 0x6f,
+	0x74, 0x6f, 0x73, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x2f, 0x76, 0x31, 0x3b, 0x76,
+	0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+	file_protos_streams_v1_streams_proto_rawDescOnce sync.Once
+	file_protos_streams_v1_streams_proto_rawDescData = file_protos_streams_v1_streams_proto_rawDesc
+)
+
+func file_protos_streams_v1_streams_proto_rawDescGZIP() []byte {
+	file_protos_streams_v1_streams_proto_rawDescOnce.Do(func() {
+		file_protos_streams_v1_streams_proto_rawDescData = protoimpl.X.CompressGZIP(file_protos_streams_v1_streams_proto_rawDescData)
+	})
+	return file_protos_streams_v1_streams_proto_rawDescData
+}
+
+var file_protos_streams_v1_streams_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_protos_streams_v1_streams_proto_msgTypes = make([]protoimpl.MessageInfo, 3)
+var file_protos_streams_v1_streams_proto_goTypes = []interface{}{
+	(IOStream)(0),         // 0: com.docker.api.protos.streams.v1.IOStream
+	(*BytesMessage)(nil),  // 1: com.docker.api.protos.streams.v1.BytesMessage
+	(*ResizeMessage)(nil), // 2: com.docker.api.protos.streams.v1.ResizeMessage
+	(*ExitMessage)(nil),   // 3: com.docker.api.protos.streams.v1.ExitMessage
+	(*any.Any)(nil),       // 4: google.protobuf.Any
+}
+var file_protos_streams_v1_streams_proto_depIdxs = []int32{
+	0, // 0: com.docker.api.protos.streams.v1.BytesMessage.type:type_name -> com.docker.api.protos.streams.v1.IOStream
+	4, // 1: com.docker.api.protos.streams.v1.Streaming.NewStream:input_type -> google.protobuf.Any
+	4, // 2: com.docker.api.protos.streams.v1.Streaming.NewStream:output_type -> google.protobuf.Any
+	2, // [2:3] is the sub-list for method output_type
+	1, // [1:2] is the sub-list for method input_type
+	1, // [1:1] is the sub-list for extension type_name
+	1, // [1:1] is the sub-list for extension extendee
+	0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_protos_streams_v1_streams_proto_init() }
+func file_protos_streams_v1_streams_proto_init() {
+	if File_protos_streams_v1_streams_proto != nil {
+		return
+	}
+	if !protoimpl.UnsafeEnabled {
+		file_protos_streams_v1_streams_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*BytesMessage); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_protos_streams_v1_streams_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ResizeMessage); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+		file_protos_streams_v1_streams_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
+			switch v := v.(*ExitMessage); i {
+			case 0:
+				return &v.state
+			case 1:
+				return &v.sizeCache
+			case 2:
+				return &v.unknownFields
+			default:
+				return nil
+			}
+		}
+	}
+	type x struct{}
+	out := protoimpl.TypeBuilder{
+		File: protoimpl.DescBuilder{
+			GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+			RawDescriptor: file_protos_streams_v1_streams_proto_rawDesc,
+			NumEnums:      1,
+			NumMessages:   3,
+			NumExtensions: 0,
+			NumServices:   1,
+		},
+		GoTypes:           file_protos_streams_v1_streams_proto_goTypes,
+		DependencyIndexes: file_protos_streams_v1_streams_proto_depIdxs,
+		EnumInfos:         file_protos_streams_v1_streams_proto_enumTypes,
+		MessageInfos:      file_protos_streams_v1_streams_proto_msgTypes,
+	}.Build()
+	File_protos_streams_v1_streams_proto = out.File
+	file_protos_streams_v1_streams_proto_rawDesc = nil
+	file_protos_streams_v1_streams_proto_goTypes = nil
+	file_protos_streams_v1_streams_proto_depIdxs = nil
+}
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ context.Context
+var _ grpc.ClientConnInterface
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the grpc package it is being compiled against.
+const _ = grpc.SupportPackageIsVersion6
+
+// StreamingClient is the client API for Streaming service.
+//
+// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
+type StreamingClient interface {
+	NewStream(ctx context.Context, opts ...grpc.CallOption) (Streaming_NewStreamClient, error)
+}
+
+type streamingClient struct {
+	cc grpc.ClientConnInterface
+}
+
+func NewStreamingClient(cc grpc.ClientConnInterface) StreamingClient {
+	return &streamingClient{cc}
+}
+
+func (c *streamingClient) NewStream(ctx context.Context, opts ...grpc.CallOption) (Streaming_NewStreamClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_Streaming_serviceDesc.Streams[0], "/com.docker.api.protos.streams.v1.Streaming/NewStream", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &streamingNewStreamClient{stream}
+	return x, nil
+}
+
+type Streaming_NewStreamClient interface {
+	Send(*any.Any) error
+	Recv() (*any.Any, error)
+	grpc.ClientStream
+}
+
+type streamingNewStreamClient struct {
+	grpc.ClientStream
+}
+
+func (x *streamingNewStreamClient) Send(m *any.Any) error {
+	return x.ClientStream.SendMsg(m)
+}
+
+func (x *streamingNewStreamClient) Recv() (*any.Any, error) {
+	m := new(any.Any)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+// StreamingServer is the server API for Streaming service.
+type StreamingServer interface {
+	NewStream(Streaming_NewStreamServer) error
+}
+
+// UnimplementedStreamingServer can be embedded to have forward compatible implementations.
+type UnimplementedStreamingServer struct {
+}
+
+func (*UnimplementedStreamingServer) NewStream(Streaming_NewStreamServer) error {
+	return status.Errorf(codes.Unimplemented, "method NewStream not implemented")
+}
+
+func RegisterStreamingServer(s *grpc.Server, srv StreamingServer) {
+	s.RegisterService(&_Streaming_serviceDesc, srv)
+}
+
+func _Streaming_NewStream_Handler(srv interface{}, stream grpc.ServerStream) error {
+	return srv.(StreamingServer).NewStream(&streamingNewStreamServer{stream})
+}
+
+type Streaming_NewStreamServer interface {
+	Send(*any.Any) error
+	Recv() (*any.Any, error)
+	grpc.ServerStream
+}
+
+type streamingNewStreamServer struct {
+	grpc.ServerStream
+}
+
+func (x *streamingNewStreamServer) Send(m *any.Any) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+func (x *streamingNewStreamServer) Recv() (*any.Any, error) {
+	m := new(any.Any)
+	if err := x.ServerStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
+var _Streaming_serviceDesc = grpc.ServiceDesc{
+	ServiceName: "com.docker.api.protos.streams.v1.Streaming",
+	HandlerType: (*StreamingServer)(nil),
+	Methods:     []grpc.MethodDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "NewStream",
+			Handler:       _Streaming_NewStream_Handler,
+			ServerStreams: true,
+			ClientStreams: true,
+		},
+	},
+	Metadata: "protos/streams/v1/streams.proto",
+}

+ 31 - 0
protos/streams/v1/streams.proto

@@ -0,0 +1,31 @@
+syntax = "proto3";
+
+package com.docker.api.protos.streams.v1;
+
+import "google/protobuf/any.proto";
+
+option go_package = "github.com/docker/api/protos/streams/v1;v1";
+
+service Streaming {
+	rpc NewStream(stream google.protobuf.Any) returns (stream google.protobuf.Any);
+}
+
+enum IOStream {
+	STDIN = 0;
+	STDOUT = 1;
+	STDERR = 2;
+}
+
+message BytesMessage {
+	IOStream type = 1;
+	bytes value = 2;
+}
+
+message ResizeMessage {
+	uint32 width = 1;
+	uint32 height = 2;
+}
+
+message ExitMessage {
+	uint32 status = 1;
+}

+ 47 - 47
server/proxy/containers.go

@@ -2,23 +2,16 @@ package proxy
 
 import (
 	"context"
+	"errors"
 
 	"github.com/docker/api/containers"
-	v1 "github.com/docker/api/protos/containers/v1"
+	containersv1 "github.com/docker/api/protos/containers/v1"
 )
 
-// NewContainerAPI creates a proxy container server
-func NewContainerAPI() v1.ContainersServer {
-	return &proxyContainerAPI{}
-}
-
-type proxyContainerAPI struct {
-}
-
-func portsToGrpc(ports []containers.Port) []*v1.Port {
-	var result []*v1.Port
+func portsToGrpc(ports []containers.Port) []*containersv1.Port {
+	var result []*containersv1.Port
 	for _, port := range ports {
-		result = append(result, &v1.Port{
+		result = append(result, &containersv1.Port{
 			ContainerPort: port.ContainerPort,
 			HostPort:      port.HostPort,
 			HostIp:        port.HostIP,
@@ -29,19 +22,19 @@ func portsToGrpc(ports []containers.Port) []*v1.Port {
 	return result
 }
 
-func (p *proxyContainerAPI) List(ctx context.Context, request *v1.ListRequest) (*v1.ListResponse, error) {
+func (p *proxy) List(ctx context.Context, request *containersv1.ListRequest) (*containersv1.ListResponse, error) {
 	client := Client(ctx)
 
 	c, err := client.ContainerService().List(ctx, request.GetAll())
 	if err != nil {
-		return &v1.ListResponse{}, err
+		return &containersv1.ListResponse{}, err
 	}
 
-	response := &v1.ListResponse{
-		Containers: []*v1.Container{},
+	response := &containersv1.ListResponse{
+		Containers: []*containersv1.Container{},
 	}
 	for _, container := range c {
-		response.Containers = append(response.Containers, &v1.Container{
+		response.Containers = append(response.Containers, &containersv1.Container{
 			Id:          container.ID,
 			Image:       container.Image,
 			Command:     container.Command,
@@ -59,55 +52,62 @@ func (p *proxyContainerAPI) List(ctx context.Context, request *v1.ListRequest) (
 	return response, nil
 }
 
-func (p *proxyContainerAPI) Create(ctx context.Context, request *v1.CreateRequest) (*v1.CreateResponse, error) {
-	client := Client(ctx)
-
-	err := client.ContainerService().Run(ctx, containers.ContainerConfig{
-		ID:    request.Id,
-		Image: request.Image,
-	})
-
-	return &v1.CreateResponse{}, err
-}
-
-func (p *proxyContainerAPI) Start(_ context.Context, request *v1.StartRequest) (*v1.StartResponse, error) {
-	panic("not implemented") // TODO: Implement
-}
-
-func (p *proxyContainerAPI) Stop(ctx context.Context, request *v1.StopRequest) (*v1.StopResponse, error) {
+func (p *proxy) Stop(ctx context.Context, request *containersv1.StopRequest) (*containersv1.StopResponse, error) {
 	c := Client(ctx)
 	timeoutValue := request.GetTimeout()
-	return &v1.StopResponse{}, c.ContainerService().Stop(ctx, request.Id, &timeoutValue)
+	return &containersv1.StopResponse{}, c.ContainerService().Stop(ctx, request.Id, &timeoutValue)
 }
 
-func (p *proxyContainerAPI) Kill(ctx context.Context, request *v1.KillRequest) (*v1.KillResponse, error) {
-	c := Client(ctx)
-	return &v1.KillResponse{}, c.ContainerService().Delete(ctx, request.Id, false)
+func (p *proxy) Run(ctx context.Context, request *containersv1.RunRequest) (*containersv1.RunResponse, error) {
+	ports := []containers.Port{}
+	for _, p := range request.GetPorts() {
+		ports = append(ports, containers.Port{
+			ContainerPort: p.ContainerPort,
+			HostIP:        p.HostIp,
+			HostPort:      p.HostPort,
+			Protocol:      p.Protocol,
+		})
+	}
+
+	err := Client(ctx).ContainerService().Run(ctx, containers.ContainerConfig{
+		ID:      request.GetId(),
+		Image:   request.GetImage(),
+		Labels:  request.GetLabels(),
+		Ports:   ports,
+		Volumes: request.GetVolumes(),
+	})
+
+	return &containersv1.RunResponse{}, err
 }
 
-func (p *proxyContainerAPI) Delete(ctx context.Context, request *v1.DeleteRequest) (*v1.DeleteResponse, error) {
+func (p *proxy) Delete(ctx context.Context, request *containersv1.DeleteRequest) (*containersv1.DeleteResponse, error) {
 	err := Client(ctx).ContainerService().Delete(ctx, request.Id, request.Force)
 	if err != nil {
-		return &v1.DeleteResponse{}, err
+		return &containersv1.DeleteResponse{}, err
 	}
 
-	return &v1.DeleteResponse{}, nil
+	return &containersv1.DeleteResponse{}, nil
 }
 
-func (p *proxyContainerAPI) Update(_ context.Context, _ *v1.UpdateRequest) (*v1.UpdateResponse, error) {
-	panic("not implemented") // TODO: Implement
-}
+func (p *proxy) Exec(ctx context.Context, request *containersv1.ExecRequest) (*containersv1.ExecResponse, error) {
+	p.mu.Lock()
+	stream, ok := p.streams[request.StreamId]
+	p.mu.Unlock()
+	if !ok {
+		return &containersv1.ExecResponse{}, errors.New("unknown stream id")
+	}
+
+	err := Client(ctx).ContainerService().Exec(ctx, request.GetId(), request.GetCommand(), &reader{stream}, &writer{stream})
 
-func (p *proxyContainerAPI) Exec(_ context.Context, _ *v1.ExecRequest) (*v1.ExecResponse, error) {
-	panic("not implemented") // TODO: Implement
+	return &containersv1.ExecResponse{}, err
 }
 
-func (p *proxyContainerAPI) Logs(request *v1.LogsRequest, stream v1.Containers_LogsServer) error {
+func (p *proxy) Logs(request *containersv1.LogsRequest, stream containersv1.Containers_LogsServer) error {
 	ctx := stream.Context()
 	c := Client(ctx)
 
 	return c.ContainerService().Logs(ctx, request.GetContainerId(), containers.LogsRequest{
 		Follow: request.Follow,
-		Writer: &streamWriter{stream},
+		Writer: &logStream{stream},
 	})
 }

+ 25 - 0
server/proxy/logstream.go

@@ -0,0 +1,25 @@
+package proxy
+
+import (
+	"io"
+
+	"google.golang.org/grpc"
+
+	containersv1 "github.com/docker/api/protos/containers/v1"
+)
+
+type logStream struct {
+	stream grpc.ServerStream
+}
+
+func newStreamWriter(stream grpc.ServerStream) io.Writer {
+	return &logStream{
+		stream: stream,
+	}
+}
+
+func (w *logStream) Write(p []byte) (n int, err error) {
+	return len(p), w.stream.SendMsg(&containersv1.LogsResponse{
+		Value: p,
+	})
+}

+ 7 - 5
server/proxy/streamwriter_test.go → server/proxy/logstream_test.go

@@ -11,7 +11,7 @@ import (
 )
 
 type logServer struct {
-	logs []byte
+	logs interface{}
 }
 
 func (ls *logServer) Send(response *v1.LogsResponse) error {
@@ -34,8 +34,7 @@ func (ls *logServer) Context() context.Context {
 }
 
 func (ls *logServer) SendMsg(m interface{}) error {
-	s, _ := m.(*v1.LogsResponse)
-	ls.logs = s.Logs
+	ls.logs = m
 	return nil
 }
 
@@ -43,14 +42,17 @@ func (ls *logServer) RecvMsg(m interface{}) error {
 	return nil
 }
 
-func TestStreamWriter(t *testing.T) {
+func TestLogStreamWriter(t *testing.T) {
 	ls := &logServer{}
 	sw := newStreamWriter(ls)
 	in := []byte{104, 101, 108, 108, 111}
+	expected := &v1.LogsResponse{
+		Value: in,
+	}
 
 	l, err := sw.Write(in)
 
 	assert.Nil(t, err)
 	assert.Equal(t, len(in), l)
-	assert.Equal(t, in, ls.logs)
+	assert.Equal(t, expected, ls.logs)
 }

+ 22 - 0
server/proxy/proxy.go

@@ -2,8 +2,11 @@ package proxy
 
 import (
 	"context"
+	"sync"
 
 	"github.com/docker/api/client"
+	containersv1 "github.com/docker/api/protos/containers/v1"
+	streamsv1 "github.com/docker/api/protos/streams/v1"
 )
 
 type clientKey struct{}
@@ -18,3 +21,22 @@ func Client(ctx context.Context) *client.Client {
 	c, _ := ctx.Value(clientKey{}).(*client.Client)
 	return c
 }
+
+// Proxy implements the gRPC server and forwards the actions
+// to the right backend
+type Proxy interface {
+	containersv1.ContainersServer
+	streamsv1.StreamingServer
+}
+
+type proxy struct {
+	mu      sync.Mutex
+	streams map[string]*Stream
+}
+
+// New creates a new proxy server
+func New() Proxy {
+	return &proxy{
+		streams: map[string]*Stream{},
+	}
+}

+ 118 - 0
server/proxy/streams.go

@@ -0,0 +1,118 @@
+package proxy
+
+import (
+	"sync"
+
+	"github.com/containerd/containerd/log"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/google/uuid"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+
+	streamsv1 "github.com/docker/api/protos/streams/v1"
+)
+
+// Stream is a bidirectional stream for container IO
+type Stream struct {
+	streamsv1.Streaming_NewStreamServer
+
+	errm    sync.Mutex
+	errChan chan<- error
+}
+
+// CloseWithError sends the result of an action to the errChan or nil
+// if no erros
+func (s *Stream) CloseWithError(err error) error {
+	s.errm.Lock()
+	defer s.errm.Unlock()
+
+	if s.errChan != nil {
+		if err != nil {
+			s.errChan <- err
+		}
+		close(s.errChan)
+		s.errChan = nil
+	}
+	return nil
+}
+
+func (p *proxy) NewStream(stream streamsv1.Streaming_NewStreamServer) error {
+	var (
+		ctx = stream.Context()
+		id  = uuid.New().String()
+	)
+	md := metadata.New(map[string]string{
+		"id": id,
+	})
+
+	// return the id of the stream to the client
+	if err := stream.SendHeader(md); err != nil {
+		return err
+	}
+
+	errc := make(chan error)
+
+	p.mu.Lock()
+	p.streams[id] = &Stream{
+		Streaming_NewStreamServer: stream,
+		errChan:                   errc,
+	}
+	p.mu.Unlock()
+
+	defer func() {
+		p.mu.Lock()
+		delete(p.streams, id)
+		p.mu.Unlock()
+	}()
+
+	select {
+	case err := <-errc:
+		return err
+	case <-ctx.Done():
+		log.G(ctx).Debug("client context canceled")
+		return ctx.Err()
+	}
+}
+
+// io.Reader that forwards everything to the stream
+type reader struct {
+	stream *Stream
+}
+
+func (r reader) Read(p []byte) (int, error) {
+	a, err := r.stream.Recv()
+	if err != nil {
+		return 0, err
+	}
+
+	var m streamsv1.BytesMessage
+	err = ptypes.UnmarshalAny(a, &m)
+	if err != nil {
+		return 0, err
+	}
+
+	return copy(p, m.Value), nil
+}
+
+// io.Writer that writes
+type writer struct {
+	stream grpc.ServerStream
+}
+
+func (w *writer) Write(p []byte) (n int, err error) {
+	if len(p) == 0 {
+		return 0, nil
+	}
+
+	message := streamsv1.BytesMessage{
+		Type:  streamsv1.IOStream_STDOUT,
+		Value: p,
+	}
+
+	m, err := ptypes.MarshalAny(&message)
+	if err != nil {
+		return 0, err
+	}
+
+	return len(message.Value), w.stream.SendMsg(m)
+}

+ 118 - 0
server/proxy/streams_test.go

@@ -0,0 +1,118 @@
+package proxy
+
+import (
+	"context"
+	"errors"
+	"testing"
+
+	"google.golang.org/grpc/metadata"
+
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/stretchr/testify/assert"
+	"github.com/stretchr/testify/require"
+
+	streamsv1 "github.com/docker/api/protos/streams/v1"
+)
+
+type byteStream struct {
+	recvResult *any.Any
+	recvErr    error
+
+	sendResult interface{}
+}
+
+func (bs *byteStream) SetHeader(metadata.MD) error {
+	return nil
+}
+
+func (bs *byteStream) SendHeader(metadata.MD) error {
+	return nil
+}
+
+func (bs *byteStream) SetTrailer(metadata.MD) {
+}
+
+func (bs *byteStream) Context() context.Context {
+	return nil
+}
+
+func (bs *byteStream) SendMsg(m interface{}) error {
+	bs.sendResult = m
+	return nil
+}
+
+func (bs *byteStream) Send(*any.Any) error {
+	return nil
+}
+
+func (bs *byteStream) Recv() (*any.Any, error) {
+	return bs.recvResult, bs.recvErr
+}
+
+func (bs *byteStream) RecvMsg(m interface{}) error {
+	return nil
+}
+
+func getReader(t *testing.T, in []byte, errResult error) reader {
+	message := streamsv1.BytesMessage{
+		Type:  streamsv1.IOStream_STDOUT,
+		Value: in,
+	}
+	m, err := ptypes.MarshalAny(&message)
+	require.Nil(t, err)
+
+	return reader{
+		stream: &Stream{
+			Streaming_NewStreamServer: &byteStream{
+				recvResult: m,
+				recvErr:    errResult,
+			},
+		},
+	}
+}
+
+func getAny(t *testing.T, in []byte) *any.Any {
+	value, err := ptypes.MarshalAny(&streamsv1.BytesMessage{
+		Type:  streamsv1.IOStream_STDOUT,
+		Value: in,
+	})
+	require.Nil(t, err)
+	return value
+}
+
+func TestStreamReader(t *testing.T) {
+	in := []byte{104, 101, 108, 108, 111}
+	r := getReader(t, in, nil)
+	buffer := make([]byte, 5)
+
+	n, err := r.Read(buffer)
+
+	assert.Nil(t, err)
+	assert.Equal(t, 5, n)
+	assert.Equal(t, in, buffer)
+}
+
+func TestStreamReaderError(t *testing.T) {
+	errResult := errors.New("err")
+	r := getReader(t, nil, errResult)
+	var buffer []byte
+
+	n, err := r.Read(buffer)
+
+	assert.Equal(t, 0, n)
+	assert.Equal(t, err, errResult)
+}
+
+func TestStreamWriter(t *testing.T) {
+	in := []byte{104, 101, 108, 108, 111}
+	expected := getAny(t, in)
+
+	bs := byteStream{}
+	w := writer{stream: &bs}
+
+	n, err := w.Write(in)
+	assert.Nil(t, err)
+	assert.Equal(t, len(in), n)
+	assert.Equal(t, expected, bs.sendResult)
+}

+ 0 - 23
server/proxy/streamwriter.go

@@ -1,23 +0,0 @@
-package proxy
-
-import (
-	"io"
-
-	v1 "github.com/docker/api/protos/containers/v1"
-)
-
-type streamWriter struct {
-	stream v1.Containers_LogsServer
-}
-
-func newStreamWriter(stream v1.Containers_LogsServer) io.Writer {
-	return &streamWriter{
-		stream: stream,
-	}
-}
-
-func (w *streamWriter) Write(p []byte) (n int, err error) {
-	return len(p), w.stream.SendMsg(&v1.LogsResponse{
-		Logs: p,
-	})
-}

+ 6 - 2
tests/node-client/build.sh

@@ -3,8 +3,10 @@ node_modules/.bin/grpc_tools_node_protoc \
     --grpc_out=generate_package_definition:./grpc \
     -I ../../protos/contexts/v1 \
     -I ../../protos/containers/v1 \
+    -I ../../protos/streams/v1 \
     ../../protos/contexts/v1/*.proto \
-    ../../protos/containers/v1/*.proto
+    ../../protos/containers/v1/*.proto \
+    ../../protos/streams/v1/*.proto
 
 # generate d.ts codes
 protoc \
@@ -12,5 +14,7 @@ protoc \
     --ts_out=generate_package_definition:./grpc \
     -I ../../protos/contexts/v1 \
     -I ../../protos/containers/v1 \
+    -I ../../protos/streams/v1 \
     ../../protos/contexts/v1/*.proto \
-    ../../protos/containers/v1/*.proto
+    ../../protos/containers/v1/*.proto \
+    ../../protos/streams/v1/*.proto

+ 85 - 0
tests/node-client/exec.ts

@@ -0,0 +1,85 @@
+import * as grpc from "@grpc/grpc-js";
+import * as readline from "readline";
+import * as google_protobuf_any_pb from "google-protobuf/google/protobuf/any_pb.js";
+
+import * as continersPb from "./grpc/containers_grpc_pb";
+import { IContainersClient } from "./grpc/containers_grpc_pb";
+import { ExecRequest, ExecResponse, LogsRequest } from "./grpc/containers_pb";
+
+import * as streamsPb from "./grpc/streams_grpc_pb";
+import { IStreamingClient } from "./grpc/streams_grpc_pb";
+import { BytesMessage } from "./grpc/streams_pb";
+
+let address = process.argv[3] || "unix:///tmp/backend.sock";
+
+const ContainersServiceClient = grpc.makeClientConstructor(
+  continersPb["com.docker.api.protos.containers.v1.Containers"],
+  "ContainersClient"
+);
+
+const client = (new ContainersServiceClient(
+  address,
+  grpc.credentials.createInsecure()
+) as unknown) as IContainersClient;
+
+const StreamsServiceClient = grpc.makeClientConstructor(
+  streamsPb["com.docker.api.protos.streams.v1.Streaming"],
+  "StreamsClient"
+);
+
+let streamClient = (new StreamsServiceClient(
+  address,
+  grpc.credentials.createInsecure()
+) as unknown) as IStreamingClient;
+
+let backend = process.argv[2] || "moby";
+let containerId = process.argv[3];
+const meta = new grpc.Metadata();
+meta.set("CONTEXT_KEY", backend);
+
+// Get the stream
+const stream = streamClient.newStream();
+
+stream.on("metadata", (m: grpc.Metadata) => {
+  let req = new ExecRequest();
+  req.setCommand("/bin/bash");
+  req.setStreamId(m.get("id")[0] as string);
+  req.setId(containerId);
+  req.setTty(true);
+
+  client.exec(req, meta, (err: any, _: ExecResponse) => {
+    if (err != null) {
+      console.error(err);
+      return;
+    }
+    process.exit();
+  });
+});
+
+readline.emitKeypressEvents(process.stdin);
+process.stdin.setRawMode(true);
+
+process.stdin.on("keypress", (str, key) => {
+  const mess = new BytesMessage();
+  const a = new Uint8Array(key.sequence.length);
+  for (let i = 0; i <= key.sequence.length; i++) {
+    a[i] = key.sequence.charCodeAt(i);
+  }
+
+  mess.setValue(a);
+
+  const any = new google_protobuf_any_pb.Any();
+  any.pack(
+    mess.serializeBinary(),
+    "type.googleapis.com/com.docker.api.protos.streams.v1.BytesMessage"
+  );
+  stream.write(any);
+});
+
+stream.on("data", (chunk: any) => {
+  const m = chunk.unpack(
+    BytesMessage.deserializeBinary,
+    "com.docker.api.protos.streams.v1.BytesMessage"
+  ) as BytesMessage;
+  process.stdout.write(m.getValue());
+});

+ 2 - 1
tests/node-client/package.json

@@ -6,12 +6,13 @@
   "scripts": {
     "start": "ts-node index.ts",
     "prestart": "./build.sh"
-  },
+    },
   "dependencies": {
     "@grpc/grpc-js": "^1.0.3",
     "grpc": "^1.24.2",
     "grpc-tools": "^1.8.1",
     "grpc_tools_node_protoc_ts": "^3.0.0",
+    "readline": "^1.3.0",
     "ts-node": "^8.9.1",
     "typescript": "^3.8.3"
   }

+ 1 - 0
tests/node-client/types.d.ts

@@ -0,0 +1 @@
+declare module "google-protobuf/google/protobuf/any_pb.js";

+ 5 - 0
tests/node-client/yarn.lock

@@ -1653,6 +1653,11 @@ readable-stream@^2.0.6, readable-stream@~2.3.6:
     string_decoder "~1.1.1"
     util-deprecate "~1.0.1"
 
+readline@^1.3.0:
+  version "1.3.0"
+  resolved "https://registry.yarnpkg.com/readline/-/readline-1.3.0.tgz#c580d77ef2cfc8752b132498060dc9793a7ac01c"
+  integrity sha1-xYDXfvLPyHUrEySYBg3JeTp6wBw=
+
 regex-not@^1.0.0, regex-not@^1.0.2:
   version "1.0.2"
   resolved "https://registry.yarnpkg.com/regex-not/-/regex-not-1.0.2.tgz#1f4ece27e00b0b65e0247a6810e6a85d83a5752c"

部分文件因文件數量過多而無法顯示