|
|
@@ -17,13 +17,16 @@
|
|
|
package compose
|
|
|
|
|
|
import (
|
|
|
+ "bytes"
|
|
|
+ "encoding/binary"
|
|
|
+ "errors"
|
|
|
"io"
|
|
|
"strings"
|
|
|
"sync"
|
|
|
"testing"
|
|
|
|
|
|
"github.com/compose-spec/compose-go/v2/types"
|
|
|
- "github.com/docker/docker/pkg/stdcopy"
|
|
|
+ "github.com/moby/moby/api/pkg/stdcopy"
|
|
|
containerType "github.com/moby/moby/api/types/container"
|
|
|
"github.com/moby/moby/client"
|
|
|
"go.uber.org/mock/gomock"
|
|
|
@@ -33,6 +36,56 @@ import (
|
|
|
compose "github.com/docker/compose/v5/pkg/api"
|
|
|
)
|
|
|
|
|
|
+// newStdWriter is copied from github.com/moby/moby/daemon/internal/stdcopymux
|
|
|
+// because NewStdWriter was moved to a daemon-internal package in moby v2 and
|
|
|
+// is no longer publicly importable. We need it in tests to produce multiplexed
|
|
|
+// streams that stdcopy.StdCopy can demultiplex.
|
|
|
+
|
|
|
+const (
|
|
|
+ stdWriterPrefixLen = 8
|
|
|
+ stdWriterFdIndex = 0
|
|
|
+ stdWriterSizeIndex = 4
|
|
|
+)
|
|
|
+
|
|
|
+var bufPool = &sync.Pool{New: func() any { return bytes.NewBuffer(nil) }}
|
|
|
+
|
|
|
+type stdWriter struct {
|
|
|
+ io.Writer
|
|
|
+ prefix byte
|
|
|
+}
|
|
|
+
|
|
|
+func (w *stdWriter) Write(p []byte) (int, error) {
|
|
|
+ if w == nil || w.Writer == nil {
|
|
|
+ return 0, errors.New("writer not instantiated")
|
|
|
+ }
|
|
|
+ if p == nil {
|
|
|
+ return 0, nil
|
|
|
+ }
|
|
|
+
|
|
|
+ header := [stdWriterPrefixLen]byte{stdWriterFdIndex: w.prefix}
|
|
|
+ binary.BigEndian.PutUint32(header[stdWriterSizeIndex:], uint32(len(p)))
|
|
|
+ buf := bufPool.Get().(*bytes.Buffer)
|
|
|
+ buf.Write(header[:])
|
|
|
+ buf.Write(p)
|
|
|
+
|
|
|
+ n, err := w.Writer.Write(buf.Bytes())
|
|
|
+ n -= stdWriterPrefixLen
|
|
|
+ if n < 0 {
|
|
|
+ n = 0
|
|
|
+ }
|
|
|
+
|
|
|
+ buf.Reset()
|
|
|
+ bufPool.Put(buf)
|
|
|
+ return n, err
|
|
|
+}
|
|
|
+
|
|
|
+func newStdWriter(w io.Writer, streamType stdcopy.StdType) io.Writer {
|
|
|
+ return &stdWriter{
|
|
|
+ Writer: w,
|
|
|
+ prefix: byte(streamType),
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestComposeService_Logs_Demux(t *testing.T) {
|
|
|
mockCtrl := gomock.NewController(t)
|
|
|
defer mockCtrl.Finish()
|
|
|
@@ -68,8 +121,8 @@ func TestComposeService_Logs_Demux(t *testing.T) {
|
|
|
_ = c1Reader.Close()
|
|
|
_ = c1Writer.Close()
|
|
|
})
|
|
|
- c1Stdout := stdcopy.NewStdWriter(c1Writer, stdcopy.Stdout)
|
|
|
- c1Stderr := stdcopy.NewStdWriter(c1Writer, stdcopy.Stderr)
|
|
|
+ c1Stdout := newStdWriter(c1Writer, stdcopy.Stdout)
|
|
|
+ c1Stderr := newStdWriter(c1Writer, stdcopy.Stderr)
|
|
|
go func() {
|
|
|
_, err := c1Stdout.Write([]byte("hello stdout\n"))
|
|
|
assert.NilError(t, err, "Writing to fake stdout")
|