Browse Source

watch: add tar sync implementation (#10853)

Brought to you by Tilt ❤️ 

Signed-off-by: Milas Bowman <[email protected]>
Milas Bowman 2 years ago
parent
commit
f65fd02383
4 changed files with 465 additions and 24 deletions
  1. 344 0
      internal/sync/tar.go
  2. 96 7
      pkg/compose/watch.go
  3. 23 15
      pkg/e2e/watch_test.go
  4. 2 2
      pkg/watch/watcher_darwin.go

+ 344 - 0
internal/sync/tar.go

@@ -0,0 +1,344 @@
+/*
+   Copyright 2018 The Tilt Dev Authors
+   Copyright 2023 Docker Compose CLI authors
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+*/
+
+package sync
+
+import (
+	"archive/tar"
+	"bytes"
+	"context"
+	"fmt"
+	"io"
+	"io/fs"
+	"os"
+	"path"
+	"path/filepath"
+	"strings"
+
+	"github.com/hashicorp/go-multierror"
+	"github.com/pkg/errors"
+
+	"github.com/compose-spec/compose-go/types"
+	moby "github.com/docker/docker/api/types"
+	"github.com/docker/docker/pkg/archive"
+)
+
+type archiveEntry struct {
+	path   string
+	info   os.FileInfo
+	header *tar.Header
+}
+
+type LowLevelClient interface {
+	ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error)
+
+	Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error
+}
+
+type Tar struct {
+	client LowLevelClient
+
+	projectName string
+}
+
+var _ Syncer = &Tar{}
+
+func NewTar(projectName string, client LowLevelClient) *Tar {
+	return &Tar{
+		projectName: projectName,
+		client:      client,
+	}
+}
+
+func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []PathMapping) error {
+	containers, err := t.client.ContainersForService(ctx, t.projectName, service.Name)
+	if err != nil {
+		return err
+	}
+
+	var pathsToCopy []PathMapping
+	var pathsToDelete []string
+	for _, p := range paths {
+		if _, err := os.Stat(p.HostPath); err != nil && errors.Is(err, fs.ErrNotExist) {
+			pathsToDelete = append(pathsToDelete, p.ContainerPath)
+		} else {
+			pathsToCopy = append(pathsToCopy, p)
+		}
+	}
+
+	// TODO: this can't be read from multiple times
+	tarReader := tarArchive(pathsToCopy)
+
+	var deleteCmd []string
+	if len(pathsToDelete) != 0 {
+		deleteCmd = append([]string{"rm", "-rf"}, pathsToDelete...)
+	}
+	copyCmd := []string{"tar", "-v", "-C", "/", "-x", "-f", "-"}
+
+	var eg multierror.Group
+	for i := range containers {
+		containerID := containers[i].ID
+		eg.Go(func() error {
+			if len(deleteCmd) != 0 {
+				if err := t.client.Exec(ctx, containerID, deleteCmd, nil); err != nil {
+					return fmt.Errorf("deleting paths in %s: %w", containerID, err)
+				}
+			}
+			if err := t.client.Exec(ctx, containerID, copyCmd, tarReader); err != nil {
+				return fmt.Errorf("copying files to %s: %w", containerID, err)
+			}
+			return nil
+		})
+	}
+	return eg.Wait().ErrorOrNil()
+}
+
+type ArchiveBuilder struct {
+	tw    *tar.Writer
+	paths []string // local paths archived
+
+	// A shared I/O buffer to help with file copying.
+	copyBuf *bytes.Buffer
+}
+
+func NewArchiveBuilder(writer io.Writer) *ArchiveBuilder {
+	tw := tar.NewWriter(writer)
+	return &ArchiveBuilder{
+		tw:      tw,
+		copyBuf: &bytes.Buffer{},
+	}
+}
+
+func (a *ArchiveBuilder) Close() error {
+	return a.tw.Close()
+}
+
+// ArchivePathsIfExist creates a tar archive of all local files in `paths`. It quietly skips any paths that don't exist.
+func (a *ArchiveBuilder) ArchivePathsIfExist(paths []PathMapping) error {
+	// In order to handle overlapping syncs, we
+	// 1) collect all the entries,
+	// 2) de-dupe them, with last-one-wins semantics
+	// 3) write all the entries
+	//
+	// It's not obvious that this is the correct behavior. A better approach
+	// (that's more in-line with how syncs work) might ignore files in earlier
+	// path mappings when we know they're going to be "synced" over.
+	// There's a bunch of subtle product decisions about how overlapping path
+	// mappings work that we're not sure about.
+	var entries []archiveEntry
+	for _, p := range paths {
+		newEntries, err := a.entriesForPath(p.HostPath, p.ContainerPath)
+		if err != nil {
+			return fmt.Errorf("inspecting %q: %w", p.HostPath, err)
+		}
+
+		entries = append(entries, newEntries...)
+	}
+
+	entries = dedupeEntries(entries)
+	for _, entry := range entries {
+		err := a.writeEntry(entry)
+		if err != nil {
+			return fmt.Errorf("archiving %q: %w", entry.path, err)
+		}
+		a.paths = append(a.paths, entry.path)
+	}
+	return nil
+}
+
+func (a *ArchiveBuilder) writeEntry(entry archiveEntry) error {
+	pathInTar := entry.path
+	header := entry.header
+
+	if header.Typeflag != tar.TypeReg {
+		// anything other than a regular file (e.g. dir, symlink) just needs the header
+		if err := a.tw.WriteHeader(header); err != nil {
+			return fmt.Errorf("writing %q header: %w", pathInTar, err)
+		}
+		return nil
+	}
+
+	file, err := os.Open(pathInTar)
+	if err != nil {
+		// In case the file has been deleted since we last looked at it.
+		if os.IsNotExist(err) {
+			return nil
+		}
+		return err
+	}
+
+	defer func() {
+		_ = file.Close()
+	}()
+
+	// The size header must match the number of contents bytes.
+	//
+	// There is room for a race condition here if something writes to the file
+	// after we've read the file size.
+	//
+	// For small files, we avoid this by first copying the file into a buffer,
+	// and using the size of the buffer to populate the header.
+	//
+	// For larger files, we don't want to copy the whole thing into a buffer,
+	// because that would blow up heap size. There is some danger that this
+	// will lead to a spurious error when the tar writer validates the sizes.
+	// That error will be disruptive but will be handled as best as we
+	// can downstream.
+	useBuf := header.Size < 5000000
+	if useBuf {
+		a.copyBuf.Reset()
+		_, err = io.Copy(a.copyBuf, file)
+		if err != nil && err != io.EOF {
+			return fmt.Errorf("copying %q: %w", pathInTar, err)
+		}
+		header.Size = int64(len(a.copyBuf.Bytes()))
+	}
+
+	// wait to write the header until _after_ the file is successfully opened
+	// to avoid generating an invalid tar entry that has a header but no contents
+	// in the case the file has been deleted
+	err = a.tw.WriteHeader(header)
+	if err != nil {
+		return fmt.Errorf("writing %q header: %w", pathInTar, err)
+	}
+
+	if useBuf {
+		_, err = io.Copy(a.tw, a.copyBuf)
+	} else {
+		_, err = io.Copy(a.tw, file)
+	}
+
+	if err != nil && err != io.EOF {
+		return fmt.Errorf("copying %q: %w", pathInTar, err)
+	}
+
+	// explicitly flush so that if the entry is invalid we will detect it now and
+	// provide a more meaningful error
+	if err := a.tw.Flush(); err != nil {
+		return fmt.Errorf("finalizing %q: %w", pathInTar, err)
+	}
+	return nil
+}
+
+// tarPath writes the given source path into tarWriter at the given dest (recursively for directories).
+// e.g. tarring my_dir --> dest d: d/file_a, d/file_b
+// If source path does not exist, quietly skips it and returns no err
+func (a *ArchiveBuilder) entriesForPath(localPath, containerPath string) ([]archiveEntry, error) {
+	localInfo, err := os.Stat(localPath)
+	if err != nil {
+		if os.IsNotExist(err) {
+			return nil, nil
+		}
+		return nil, err
+	}
+
+	localPathIsDir := localInfo.IsDir()
+	if localPathIsDir {
+		// Make sure we can trim this off filenames to get valid relative filepaths
+		if !strings.HasSuffix(localPath, string(filepath.Separator)) {
+			localPath += string(filepath.Separator)
+		}
+	}
+
+	containerPath = strings.TrimPrefix(containerPath, "/")
+
+	result := make([]archiveEntry, 0)
+	err = filepath.Walk(localPath, func(curLocalPath string, info os.FileInfo, err error) error {
+		if err != nil {
+			return fmt.Errorf("walking %q: %w", curLocalPath, err)
+		}
+
+		linkname := ""
+		if info.Mode()&os.ModeSymlink != 0 {
+			var err error
+			linkname, err = os.Readlink(curLocalPath)
+			if err != nil {
+				return err
+			}
+		}
+
+		var name string
+		//nolint:gocritic
+		if localPathIsDir {
+			// Name of file in tar should be relative to source directory...
+			tmp, err := filepath.Rel(localPath, curLocalPath)
+			if err != nil {
+				return fmt.Errorf("making %q relative to %q: %w", curLocalPath, localPath, err)
+			}
+			// ...and live inside `dest`
+			name = path.Join(containerPath, filepath.ToSlash(tmp))
+		} else if strings.HasSuffix(containerPath, "/") {
+			name = containerPath + filepath.Base(curLocalPath)
+		} else {
+			name = containerPath
+		}
+
+		header, err := archive.FileInfoHeader(name, info, linkname)
+		if err != nil {
+			// Not all types of files are allowed in a tarball. That's OK.
+			// Mimic the Docker behavior and just skip the file.
+			return nil
+		}
+
+		result = append(result, archiveEntry{
+			path:   curLocalPath,
+			info:   info,
+			header: header,
+		})
+
+		return nil
+	})
+	if err != nil {
+		return nil, err
+	}
+	return result, nil
+}
+
+func tarArchive(ops []PathMapping) io.ReadCloser {
+	pr, pw := io.Pipe()
+	go func() {
+		ab := NewArchiveBuilder(pw)
+		err := ab.ArchivePathsIfExist(ops)
+		if err != nil {
+			_ = pw.CloseWithError(fmt.Errorf("adding files to tar: %w", err))
+		} else {
+			// propagate errors from the TarWriter::Close() because it performs a final
+			// Flush() and any errors mean the tar is invalid
+			if err := ab.Close(); err != nil {
+				_ = pw.CloseWithError(fmt.Errorf("closing tar: %w", err))
+			} else {
+				_ = pw.Close()
+			}
+		}
+	}()
+	return pr
+}
+
+// Dedupe the entries with last-entry-wins semantics.
+func dedupeEntries(entries []archiveEntry) []archiveEntry {
+	seenIndex := make(map[string]int, len(entries))
+	result := make([]archiveEntry, 0, len(entries))
+	for i, entry := range entries {
+		seenIndex[entry.header.Name] = i
+	}
+	for i, entry := range entries {
+		if seenIndex[entry.header.Name] == i {
+			result = append(result, entry)
+		}
+	}
+	return result
+}

+ 96 - 7
pkg/compose/watch.go

@@ -17,11 +17,16 @@ package compose
 import (
 	"context"
 	"fmt"
+	"io"
+	"os"
 	"path"
 	"path/filepath"
+	"strconv"
 	"strings"
 	"time"
 
+	moby "github.com/docker/docker/api/types"
+
 	"github.com/docker/compose/v2/internal/sync"
 
 	"github.com/compose-spec/compose-go/types"
@@ -185,11 +190,11 @@ WATCH:
 			hostPath := event.Path()
 
 			for i, trigger := range triggers {
-				logrus.Debugf("change detected on %s - comparing with %s", hostPath, trigger.Path)
+				logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
 				if watch.IsChild(trigger.Path, hostPath) {
-
 					match, err := ignores[i].Matches(hostPath)
 					if err != nil {
+						logrus.Warnf("error ignore matching %q: %v", hostPath, err)
 						return err
 					}
 
@@ -198,6 +203,7 @@ WATCH:
 						continue
 					}
 
+					logrus.Infof("change for %q", hostPath)
 					fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath)
 
 					switch trigger.Action {
@@ -241,9 +247,18 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
 	if err != nil {
 		return nil, err
 	}
+	baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
+	if err != nil {
+		return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
+	}
+
 	for i, trigger := range config.Watch {
 		if !filepath.IsAbs(trigger.Path) {
-			trigger.Path = filepath.Join(project.WorkingDir, trigger.Path)
+			trigger.Path = filepath.Join(baseDir, trigger.Path)
+		}
+		if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
+			// this might fail because the path doesn't exist, etc.
+			trigger.Path = p
 		}
 		trigger.Path = filepath.Clean(trigger.Path)
 		if trigger.Path == "" {
@@ -301,19 +316,24 @@ func (s *composeService) makeSyncFn(
 	project *types.Project,
 	needSync <-chan sync.PathMapping,
 ) func() error {
-	syncer := sync.NewDockerCopy(project.Name, s, s.stdinfo())
+	var syncer sync.Syncer
+	if useTar, _ := strconv.ParseBool(os.Getenv("COMPOSE_EXPERIMENTAL_WATCH_TAR")); useTar {
+		syncer = sync.NewTar(project.Name, tarDockerClient{s: s})
+	} else {
+		syncer = sync.NewDockerCopy(project.Name, s, s.stdinfo())
+	}
 
 	return func() error {
 		for {
 			select {
 			case <-ctx.Done():
 				return nil
-			case pathMapping := <-needSync:
-				service, err := project.GetService(pathMapping.Service)
+			case op := <-needSync:
+				service, err := project.GetService(op.Service)
 				if err != nil {
 					return err
 				}
-				if err := syncer.Sync(ctx, service, []sync.PathMapping{pathMapping}); err != nil {
+				if err := syncer.Sync(ctx, service, []sync.PathMapping{op}); err != nil {
 					return err
 				}
 			}
@@ -356,3 +376,72 @@ func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolu
 	}
 	return false
 }
+
+type tarDockerClient struct {
+	s *composeService
+}
+
+func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
+	containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
+	if err != nil {
+		return nil, err
+	}
+	return containers, nil
+}
+
+func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
+	execCfg := moby.ExecConfig{
+		Cmd:          cmd,
+		AttachStdout: false,
+		AttachStderr: true,
+		AttachStdin:  in != nil,
+		Tty:          false,
+	}
+	execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
+	if err != nil {
+		return err
+	}
+
+	startCheck := moby.ExecStartCheck{Tty: false, Detach: false}
+	conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
+	if err != nil {
+		return err
+	}
+	defer conn.Close()
+
+	var eg errgroup.Group
+	if in != nil {
+		eg.Go(func() error {
+			defer func() {
+				_ = conn.CloseWrite()
+			}()
+			_, err := io.Copy(conn.Conn, in)
+			return err
+		})
+	}
+	eg.Go(func() error {
+		_, err := io.Copy(t.s.stdinfo(), conn.Reader)
+		return err
+	})
+
+	err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
+	if err != nil {
+		return err
+	}
+
+	// although the errgroup is not tied directly to the context, the operations
+	// in it are reading/writing to the connection, which is tied to the context,
+	// so they won't block indefinitely
+	if err := eg.Wait(); err != nil {
+		return err
+	}
+
+	execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
+	if err != nil {
+		return err
+	}
+	if execResult.ExitCode != 0 {
+		return fmt.Errorf("exit code %d", execResult.ExitCode)
+	}
+	return nil
+}

+ 23 - 15
pkg/e2e/watch_test.go

@@ -20,7 +20,6 @@ import (
 	"fmt"
 	"os"
 	"path/filepath"
-	"runtime"
 	"strings"
 	"sync/atomic"
 	"testing"
@@ -34,21 +33,28 @@ import (
 )
 
 func TestWatch(t *testing.T) {
-	if runtime.GOOS == "darwin" {
-		t.Skip("Test currently broken on macOS due to symlink issues (see compose-go#436)")
-	}
-
 	services := []string{"alpine", "busybox", "debian"}
-	for _, svcName := range services {
-		t.Run(svcName, func(t *testing.T) {
-			t.Helper()
-			doTest(t, svcName)
-		})
-	}
+	t.Run("docker cp", func(t *testing.T) {
+		for _, svcName := range services {
+			t.Run(svcName, func(t *testing.T) {
+				t.Helper()
+				doTest(t, svcName, false)
+			})
+		}
+	})
+
+	t.Run("tar", func(t *testing.T) {
+		for _, svcName := range services {
+			t.Run(svcName, func(t *testing.T) {
+				t.Helper()
+				doTest(t, svcName, true)
+			})
+		}
+	})
 }
 
 // NOTE: these tests all share a single Compose file but are safe to run concurrently
-func doTest(t *testing.T, svcName string) {
+func doTest(t *testing.T, svcName string, tarSync bool) {
 	tmpdir := t.TempDir()
 	dataDir := filepath.Join(tmpdir, "data")
 	writeDataFile := func(name string, contents string) {
@@ -67,6 +73,9 @@ func doTest(t *testing.T, svcName string) {
 		"COMPOSE_FILE=" + composeFilePath,
 		"COMPOSE_PROJECT_NAME=" + projName,
 	}
+	if tarSync {
+		env = append(env, "COMPOSE_EXPERIMENTAL_WATCH_TAR=1")
+	}
 
 	cli := NewCLI(t, WithEnv(env...))
 
@@ -93,7 +102,7 @@ func doTest(t *testing.T, svcName string) {
 	var testComplete atomic.Bool
 	go func() {
 		// if the process exits abnormally before the test is done, fail the test
-		if err := r.Cmd.Wait(); err != nil && !testComplete.Load() {
+		if err := r.Cmd.Wait(); err != nil && !t.Failed() && !testComplete.Load() {
 			assert.Check(t, cmp.Nil(err))
 		}
 	}()
@@ -136,8 +145,7 @@ func doTest(t *testing.T, svcName string) {
 		Assert(t, icmd.Expected{
 			ExitCode: 1,
 			Err:      "No such file or directory",
-		},
-		)
+		})
 
 	t.Logf("Writing to ignored paths")
 	writeDataFile("data.foo", "ignored")

+ 2 - 2
pkg/watch/watcher_darwin.go

@@ -121,8 +121,8 @@ func newWatcher(paths []string, ignore PathMatcher) (Notify, error) {
 	dw := &fseventNotify{
 		ignore: ignore,
 		stream: &fsevents.EventStream{
-			Latency: 1 * time.Millisecond,
-			Flags:   fsevents.FileEvents,
+			Latency: 50 * time.Millisecond,
+			Flags:   fsevents.FileEvents | fsevents.IgnoreSelf,
 			// NOTE(dmiller): this corresponds to the `sinceWhen` parameter in FSEventStreamCreate
 			// https://developer.apple.com/documentation/coreservices/1443980-fseventstreamcreate
 			EventID: fsevents.LatestEventID(),