Browse Source

watch: support multiple containers for tar implementation (#10860)

Support services with scale > 1 for the tar watch sync.

Add a "lossy" multi-writer specific to pipes that writes the
tar data to each `io.PipeWriter`, which is connected to `stdin`
for the `tar` process being exec'd in the container.

The data is written serially to each writer. This could be
adjusted to do concurrent writes but that will rapidly increase
the I/O load, so is not done here - in general, 99% of the
time you'll be developing (and thus using watch/sync) with a
single replica of a service.

If a write fails, the corresponding `io.PipeWriter` is removed
from the active set and closed with an error.

This means that a single container copy failing won't stop
writes to the others that are succeeding. Of course, they will
be in an inconsistent state afterwards still, but that's a
different problem.

Signed-off-by: Milas Bowman <[email protected]>
Milas Bowman 2 years ago
parent
commit
efd44de1b7
3 changed files with 260 additions and 4 deletions
  1. 17 4
      internal/sync/tar.go
  2. 91 0
      internal/sync/writer.go
  3. 152 0
      internal/sync/writer_test.go

+ 17 - 4
internal/sync/tar.go

@@ -80,9 +80,6 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat
 		}
 	}
 
-	// TODO: this can't be read from multiple times
-	tarReader := tarArchive(pathsToCopy)
-
 	var deleteCmd []string
 	if len(pathsToDelete) != 0 {
 		deleteCmd = append([]string{"rm", "-rf"}, pathsToDelete...)
@@ -90,20 +87,36 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat
 	copyCmd := []string{"tar", "-v", "-C", "/", "-x", "-f", "-"}
 
 	var eg multierror.Group
+	writers := make([]*io.PipeWriter, len(containers))
 	for i := range containers {
 		containerID := containers[i].ID
+		r, w := io.Pipe()
+		writers[i] = w
 		eg.Go(func() error {
 			if len(deleteCmd) != 0 {
 				if err := t.client.Exec(ctx, containerID, deleteCmd, nil); err != nil {
 					return fmt.Errorf("deleting paths in %s: %w", containerID, err)
 				}
 			}
-			if err := t.client.Exec(ctx, containerID, copyCmd, tarReader); err != nil {
+			if err := t.client.Exec(ctx, containerID, copyCmd, r); err != nil {
 				return fmt.Errorf("copying files to %s: %w", containerID, err)
 			}
 			return nil
 		})
 	}
+
+	multiWriter := newLossyMultiWriter(writers...)
+	tarReader := tarArchive(pathsToCopy)
+	defer func() {
+		_ = tarReader.Close()
+		multiWriter.Close()
+	}()
+	_, err = io.Copy(multiWriter, tarReader)
+	if err != nil {
+		return err
+	}
+	multiWriter.Close()
+
 	return eg.Wait().ErrorOrNil()
 }
 

+ 91 - 0
internal/sync/writer.go

@@ -0,0 +1,91 @@
+/*
+   Copyright 2023 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package sync
+
+import (
+	"errors"
+	"io"
+)
+
+// lossyMultiWriter attempts to tee all writes to the provided io.PipeWriter
+// instances.
+//
+// If a writer fails during a Write call, the write-side of the pipe is then
+// closed with the error and no subsequent attempts are made to write to the
+// pipe.
+//
+// If all writers fail during a write, an error is returned.
+//
+// On Close, any remaining writers are closed.
+type lossyMultiWriter struct {
+	writers []*io.PipeWriter
+}
+
+// newLossyMultiWriter creates a new writer that *attempts* to tee all data written to it to the provided io.PipeWriter
+// instances. Rather than failing a write operation if any writer fails, writes only fail if there are no more valid
+// writers. Otherwise, errors for specific writers are propagated via CloseWithError.
+func newLossyMultiWriter(writers ...*io.PipeWriter) *lossyMultiWriter {
+	// reverse the writers because during the write we iterate
+	// backwards, so this way we'll end up writing in the same
+	// order as the writers were passed to us
+	writers = append([]*io.PipeWriter(nil), writers...)
+	for i, j := 0, len(writers)-1; i < j; i, j = i+1, j-1 {
+		writers[i], writers[j] = writers[j], writers[i]
+	}
+
+	return &lossyMultiWriter{
+		writers: writers,
+	}
+}
+
+// Write writes to each writer that is still active (i.e. has not failed/encountered an error on write).
+//
+// If a writer encounters an error during the write, the write side of the pipe is closed with the error
+// and no subsequent attempts will be made to write to that writer.
+//
+// An error is only returned from this function if ALL writers have failed.
+func (l *lossyMultiWriter) Write(p []byte) (int, error) {
+	// NOTE: this function iterates backwards so that it can
+	// 	safely remove elements during the loop
+	for i := len(l.writers) - 1; i >= 0; i-- {
+		written, err := l.writers[i].Write(p)
+		if err == nil && written != len(p) {
+			err = io.ErrShortWrite
+		}
+		if err != nil {
+			// pipe writer close cannot fail
+			_ = l.writers[i].CloseWithError(err)
+			l.writers = append(l.writers[:i], l.writers[i+1:]...)
+		}
+	}
+
+	if len(l.writers) == 0 {
+		return 0, errors.New("no writers remaining")
+	}
+
+	return len(p), nil
+}
+
+// Close closes any still open (non-failed) writers.
+//
+// Failed writers have already been closed with an error.
+func (l *lossyMultiWriter) Close() {
+	for i := range l.writers {
+		// pipe writer close cannot fail
+		_ = l.writers[i].Close()
+	}
+}

+ 152 - 0
internal/sync/writer_test.go

@@ -0,0 +1,152 @@
+/*
+   Copyright 2023 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package sync
+
+import (
+	"context"
+	"io"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/pkg/errors"
+	"github.com/stretchr/testify/require"
+)
+
+func TestLossyMultiWriter(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	t.Cleanup(cancel)
+
+	const count = 5
+	readers := make([]*bufReader, count)
+	writers := make([]*io.PipeWriter, count)
+	for i := 0; i < count; i++ {
+		r, w := io.Pipe()
+		readers[i] = newBufReader(ctx, r)
+		writers[i] = w
+	}
+
+	w := newLossyMultiWriter(writers...)
+	t.Cleanup(w.Close)
+	n, err := w.Write([]byte("hello world"))
+	require.Equal(t, 11, n)
+	require.NoError(t, err)
+	for i := range readers {
+		readers[i].waitForWrite(t)
+		require.Equal(t, "hello world", string(readers[i].contents()))
+		readers[i].reset()
+	}
+
+	// even if a writer fails (in this case simulated by closing the receiving end of the pipe),
+	// write operations should continue to return nil error but the writer should be closed
+	// with an error
+	const failIndex = 3
+	require.NoError(t, readers[failIndex].r.CloseWithError(errors.New("oh no")))
+	n, err = w.Write([]byte("hello"))
+	require.Equal(t, 5, n)
+	require.NoError(t, err)
+	for i := range readers {
+		readers[i].waitForWrite(t)
+		if i == failIndex {
+			err := readers[i].error()
+			require.EqualError(t, err, "io: read/write on closed pipe")
+			require.Empty(t, readers[i].contents())
+		} else {
+			require.Equal(t, "hello", string(readers[i].contents()))
+		}
+	}
+
+	// perform another write, verify there's still no errors
+	n, err = w.Write([]byte(" world"))
+	require.Equal(t, 6, n)
+	require.NoError(t, err)
+}
+
+type bufReader struct {
+	ctx       context.Context
+	r         *io.PipeReader
+	mu        sync.Mutex
+	err       error
+	data      []byte
+	writeSync chan struct{}
+}
+
+func newBufReader(ctx context.Context, r *io.PipeReader) *bufReader {
+	b := &bufReader{
+		ctx:       ctx,
+		r:         r,
+		writeSync: make(chan struct{}),
+	}
+	go b.consume()
+	return b
+}
+
+func (b *bufReader) waitForWrite(t testing.TB) {
+	t.Helper()
+	select {
+	case <-b.writeSync:
+		return
+	case <-time.After(50 * time.Millisecond):
+		t.Fatal("timed out waiting for write")
+	}
+}
+
+func (b *bufReader) consume() {
+	defer close(b.writeSync)
+	for {
+		buf := make([]byte, 512)
+		n, err := b.r.Read(buf)
+		if n != 0 {
+			b.mu.Lock()
+			b.data = append(b.data, buf[:n]...)
+			b.mu.Unlock()
+		}
+		if err == io.EOF {
+			return
+		}
+		if err != nil {
+			b.mu.Lock()
+			b.err = err
+			b.mu.Unlock()
+			return
+		}
+		// prevent goroutine leak, tie lifetime to the test
+		select {
+		case b.writeSync <- struct{}{}:
+		case <-b.ctx.Done():
+			return
+		}
+	}
+}
+
+func (b *bufReader) contents() []byte {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	return b.data
+}
+
+func (b *bufReader) reset() {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	b.data = nil
+}
+
+func (b *bufReader) error() error {
+	b.mu.Lock()
+	defer b.mu.Unlock()
+	return b.err
+}