Просмотр исходного кода

watch: batch & de-duplicate file events (#10865)

Adjust the debouncing logic so that it applies to all inbound file
events, regardless of whether they match a sync or rebuild rule.

When the batch is flushed out, if any event for the service is a
rebuild event, then the service is rebuilt and all sync events for
the batch are ignored. If _all_ events in the batch are sync events,
then a sync is triggered, passing the entire batch at once. This
provides a substantial performance win for the new `tar`-based
implementation, as it can efficiently transfer the changes in bulk.

Additionally, this helps with jitter, e.g. it's not uncommon for
there to be double-writes in quick succession to a file, so even if
there's not many files being modified at once, it can still prevent
some unnecessary transfers.

Signed-off-by: Milas Bowman <[email protected]>
Milas Bowman 2 лет назад
Родитель
Сommit
3b0742fd57

+ 5 - 5
internal/sync/docker_cp.go

@@ -71,19 +71,19 @@ func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, path
 		if fi.IsDir() {
 			for i := 1; i <= scale; i++ {
 				_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
-					Service: pathMapping.Service,
+					Service: service.Name,
 					Command: []string{"mkdir", "-p", pathMapping.ContainerPath},
 					Index:   i,
 				})
 				if err != nil {
-					logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
+					logrus.Warnf("failed to create %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
 				}
 			}
 			fmt.Fprintf(d.infoWriter, "%s created\n", pathMapping.ContainerPath)
 		} else {
 			err := d.client.Copy(ctx, d.projectName, api.CopyOptions{
 				Source:      pathMapping.HostPath,
-				Destination: fmt.Sprintf("%s:%s", pathMapping.Service, pathMapping.ContainerPath),
+				Destination: fmt.Sprintf("%s:%s", service.Name, pathMapping.ContainerPath),
 			})
 			if err != nil {
 				return err
@@ -93,12 +93,12 @@ func (d *DockerCopy) sync(ctx context.Context, service types.ServiceConfig, path
 	} else if errors.Is(statErr, fs.ErrNotExist) {
 		for i := 1; i <= scale; i++ {
 			_, err := d.client.Exec(ctx, d.projectName, api.RunOptions{
-				Service: pathMapping.Service,
+				Service: service.Name,
 				Command: []string{"rm", "-rf", pathMapping.ContainerPath},
 				Index:   i,
 			})
 			if err != nil {
-				logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, pathMapping.Service, err)
+				logrus.Warnf("failed to delete %q from %s: %v", pathMapping.ContainerPath, service.Name, err)
 			}
 		}
 		fmt.Fprintf(d.infoWriter, "%s deleted from service\n", pathMapping.ContainerPath)

+ 0 - 2
internal/sync/shared.go

@@ -22,8 +22,6 @@ import (
 
 // PathMapping contains the Compose service and modified host system path.
 type PathMapping struct {
-	// Service that the file event is for.
-	Service string
 	// HostPath that was created/modified/deleted outside the container.
 	//
 	// This is the path as seen from the user's perspective, e.g.

+ 1 - 4
internal/sync/tar.go

@@ -121,9 +121,7 @@ func (t *Tar) Sync(ctx context.Context, service types.ServiceConfig, paths []Pat
 }
 
 type ArchiveBuilder struct {
-	tw    *tar.Writer
-	paths []string // local paths archived
-
+	tw *tar.Writer
 	// A shared I/O buffer to help with file copying.
 	copyBuf *bytes.Buffer
 }
@@ -168,7 +166,6 @@ func (a *ArchiveBuilder) ArchivePathsIfExist(paths []PathMapping) error {
 		if err != nil {
 			return fmt.Errorf("archiving %q: %w", entry.path, err)
 		}
-		a.paths = append(a.paths, entry.path)
 	}
 	return nil
 }

+ 4 - 0
pkg/compose/compose.go

@@ -26,6 +26,8 @@ import (
 	"strings"
 	"sync"
 
+	"github.com/jonboulle/clockwork"
+
 	"github.com/docker/docker/api/types/volume"
 
 	"github.com/compose-spec/compose-go/types"
@@ -58,6 +60,7 @@ func init() {
 func NewComposeService(dockerCli command.Cli) api.Service {
 	return &composeService{
 		dockerCli:      dockerCli,
+		clock:          clockwork.NewRealClock(),
 		maxConcurrency: -1,
 		dryRun:         false,
 	}
@@ -65,6 +68,7 @@ func NewComposeService(dockerCli command.Cli) api.Service {
 
 type composeService struct {
 	dockerCli      command.Cli
+	clock          clockwork.Clock
 	maxConcurrency int
 	dryRun         bool
 }

+ 214 - 156
pkg/compose/watch.go

@@ -21,6 +21,7 @@ import (
 	"os"
 	"path"
 	"path/filepath"
+	"sort"
 	"strconv"
 	"strings"
 	"time"
@@ -37,7 +38,6 @@ import (
 	"golang.org/x/sync/errgroup"
 
 	"github.com/docker/compose/v2/pkg/api"
-	"github.com/docker/compose/v2/pkg/utils"
 	"github.com/docker/compose/v2/pkg/watch"
 )
 
@@ -45,9 +45,11 @@ type DevelopmentConfig struct {
 	Watch []Trigger `json:"watch,omitempty"`
 }
 
+type WatchAction string
+
 const (
-	WatchActionSync    = "sync"
-	WatchActionRebuild = "rebuild"
+	WatchActionSync    WatchAction = "sync"
+	WatchActionRebuild WatchAction = "rebuild"
 )
 
 type Trigger struct {
@@ -57,44 +59,34 @@ type Trigger struct {
 	Ignore []string `json:"ignore,omitempty"`
 }
 
-const quietPeriod = 2 * time.Second
+const quietPeriod = 500 * time.Millisecond
 
 // fileEvent contains the Compose service and modified host system path.
 type fileEvent struct {
-	// Service that the file event is for.
-	Service string
-	// HostPath that was created/modified/deleted outside the container.
-	//
-	// This is the path as seen from the user's perspective, e.g.
-	// 	- C:\Users\moby\Documents\hello-world\main.go
-	//  - /Users/moby/Documents/hello-world/main.go
-	HostPath string
+	sync.PathMapping
+	Action WatchAction
 }
 
 func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo
-	needRebuild := make(chan fileEvent)
-	needSync := make(chan sync.PathMapping)
-
 	_, err := s.prepareProjectForBuild(project, nil)
 	if err != nil {
 		return err
 	}
+	var syncer sync.Syncer
+	if useTar, _ := strconv.ParseBool(os.Getenv("COMPOSE_EXPERIMENTAL_WATCH_TAR")); useTar {
+		syncer = sync.NewTar(project.Name, tarDockerClient{s: s})
+	} else {
+		syncer = sync.NewDockerCopy(project.Name, s, s.stdinfo())
+	}
 
-	eg, ctx := errgroup.WithContext(ctx)
-	eg.Go(func() error {
-		clock := clockwork.NewRealClock()
-		debounce(ctx, clock, quietPeriod, needRebuild, s.makeRebuildFn(ctx, project))
-		return nil
-	})
-
-	eg.Go(s.makeSyncFn(ctx, project, needSync))
-
-	ss, err := project.GetServices(services...)
-	if err != nil {
+	if err := project.ForServices(services); err != nil {
 		return err
 	}
+
+	eg, ctx := errgroup.WithContext(ctx)
 	watching := false
-	for _, service := range ss {
+	for i := range project.Services {
+		service := project.Services[i]
 		config, err := loadDevelopmentConfig(service, project)
 		if err != nil {
 			return err
@@ -118,7 +110,10 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
 			continue
 		}
 
-		name := service.Name
+		// set the service to always be built - watch triggers `Up()` when it receives a rebuild event
+		service.PullPolicy = types.PullPolicyBuild
+		project.Services[i] = service
+
 		dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
 		if err != nil {
 			return err
@@ -160,7 +155,7 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
 
 		eg.Go(func() error {
 			defer watcher.Close() //nolint:errcheck
-			return s.watch(ctx, name, watcher, config.Watch, needSync, needRebuild)
+			return s.watch(ctx, project, service.Name, watcher, syncer, config.Watch)
 		})
 	}
 
@@ -171,7 +166,17 @@ func (s *composeService) Watch(ctx context.Context, project *types.Project, serv
 	return eg.Wait()
 }
 
-func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan sync.PathMapping, needRebuild chan fileEvent) error {
+func (s *composeService) watch(
+	ctx context.Context,
+	project *types.Project,
+	name string,
+	watcher watch.Notify,
+	syncer sync.Syncer,
+	triggers []Trigger,
+) error {
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
 	ignores := make([]watch.PathMatcher, len(triggers))
 	for i, trigger := range triggers {
 		ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
@@ -181,62 +186,82 @@ func (s *composeService) watch(ctx context.Context, name string, watcher watch.N
 		ignores[i] = ignore
 	}
 
-WATCH:
+	events := make(chan fileEvent)
+	batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case batch := <-batchEvents:
+				start := time.Now()
+				logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
+				if err := s.handleWatchBatch(ctx, project, name, batch, syncer); err != nil {
+					logrus.Warnf("Error handling changed files for service %s: %v", name, err)
+				}
+				logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
+					name, time.Since(start), len(batch))
+			}
+		}
+	}()
+
 	for {
 		select {
 		case <-ctx.Done():
 			return nil
+		case err := <-watcher.Errors():
+			return err
 		case event := <-watcher.Events():
 			hostPath := event.Path()
-
 			for i, trigger := range triggers {
 				logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
-				if watch.IsChild(trigger.Path, hostPath) {
-					match, err := ignores[i].Matches(hostPath)
-					if err != nil {
-						logrus.Warnf("error ignore matching %q: %v", hostPath, err)
-						return err
-					}
-
-					if match {
-						logrus.Debugf("%s is matching ignore pattern", hostPath)
-						continue
-					}
-
-					logrus.Infof("change for %q", hostPath)
-					fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath)
-
-					switch trigger.Action {
-					case WatchActionSync:
-						logrus.Debugf("modified file %s triggered sync", hostPath)
-						rel, err := filepath.Rel(trigger.Path, hostPath)
-						if err != nil {
-							return err
-						}
-						needSync <- sync.PathMapping{
-							Service:  name,
-							HostPath: hostPath,
-							// always use Unix-style paths for inside the container
-							ContainerPath: path.Join(trigger.Target, rel),
-						}
-					case WatchActionRebuild:
-						logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
-						needRebuild <- fileEvent{
-							HostPath: hostPath,
-							Service:  name,
-						}
-					default:
-						return fmt.Errorf("watch action %q is not supported", trigger)
-					}
-					continue WATCH
+				if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil {
+					events <- *fileEvent
 				}
 			}
-		case err := <-watcher.Errors():
-			return err
 		}
 	}
 }
 
+// maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
+// rules.
+//
+// Any errors are logged as warnings and nil (no file event) is returned.
+func maybeFileEvent(trigger Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
+	if !watch.IsChild(trigger.Path, hostPath) {
+		return nil
+	}
+	isIgnored, err := ignore.Matches(hostPath)
+	if err != nil {
+		logrus.Warnf("error ignore matching %q: %v", hostPath, err)
+		return nil
+	}
+
+	if isIgnored {
+		logrus.Debugf("%s is matching ignore pattern", hostPath)
+		return nil
+	}
+
+	var containerPath string
+	if trigger.Target != "" {
+		rel, err := filepath.Rel(trigger.Path, hostPath)
+		if err != nil {
+			logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
+			return nil
+		}
+		// always use Unix-style paths for inside the container
+		containerPath = path.Join(trigger.Target, rel)
+	}
+
+	return &fileEvent{
+		Action: WatchAction(trigger.Action),
+		PathMapping: sync.PathMapping{
+			HostPath:      hostPath,
+			ContainerPath: containerPath,
+		},
+	}
+}
+
 func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*DevelopmentConfig, error) {
 	var config DevelopmentConfig
 	y, ok := service.Extensions["x-develop"]
@@ -265,7 +290,7 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
 			return nil, errors.New("watch rules MUST define a path")
 		}
 
-		if trigger.Action == WatchActionRebuild && service.Build == nil {
+		if trigger.Action == string(WatchActionRebuild) && service.Build == nil {
 			return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
 		}
 
@@ -274,98 +299,54 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
 	return &config, nil
 }
 
-func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) {
-	for i, service := range project.Services {
-		service.PullPolicy = types.PullPolicyBuild
-		project.Services[i] = service
-	}
-	return func(services rebuildServices) {
-		serviceNames := make([]string, 0, len(services))
-		allPaths := make(utils.Set[string])
-		for serviceName, paths := range services {
-			serviceNames = append(serviceNames, serviceName)
-			for p := range paths {
-				allPaths.Add(p)
+// batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
+// channel.
+//
+// The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
+func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
+	out := make(chan []fileEvent)
+	go func() {
+		defer close(out)
+		seen := make(map[fileEvent]time.Time)
+		flushEvents := func() {
+			if len(seen) == 0 {
+				return
 			}
+			events := make([]fileEvent, 0, len(seen))
+			for e := range seen {
+				events = append(events, e)
+			}
+			// sort batch by oldest -> newest
+			// (if an event is seen > 1 per batch, it gets the latest timestamp)
+			sort.SliceStable(events, func(i, j int) bool {
+				x := events[i]
+				y := events[j]
+				return seen[x].Before(seen[y])
+			})
+			out <- events
+			seen = make(map[fileEvent]time.Time)
 		}
 
-		fmt.Fprintf(
-			s.stdinfo(),
-			"Rebuilding %s after changes were detected:%s\n",
-			strings.Join(serviceNames, ", "),
-			strings.Join(append([]string{""}, allPaths.Elements()...), "\n  - "),
-		)
-		err := s.Up(ctx, project, api.UpOptions{
-			Create: api.CreateOptions{
-				Services: serviceNames,
-				Inherit:  true,
-			},
-			Start: api.StartOptions{
-				Services: serviceNames,
-				Project:  project,
-			},
-		})
-		if err != nil {
-			fmt.Fprintf(s.stderr(), "Application failed to start after update\n")
-		}
-	}
-}
-
-func (s *composeService) makeSyncFn(
-	ctx context.Context,
-	project *types.Project,
-	needSync <-chan sync.PathMapping,
-) func() error {
-	var syncer sync.Syncer
-	if useTar, _ := strconv.ParseBool(os.Getenv("COMPOSE_EXPERIMENTAL_WATCH_TAR")); useTar {
-		syncer = sync.NewTar(project.Name, tarDockerClient{s: s})
-	} else {
-		syncer = sync.NewDockerCopy(project.Name, s, s.stdinfo())
-	}
-
-	return func() error {
+		t := clock.NewTicker(delay)
+		defer t.Stop()
 		for {
 			select {
 			case <-ctx.Done():
-				return nil
-			case op := <-needSync:
-				service, err := project.GetService(op.Service)
-				if err != nil {
-					return err
-				}
-				if err := syncer.Sync(ctx, service, []sync.PathMapping{op}); err != nil {
-					return err
+				return
+			case <-t.Chan():
+				flushEvents()
+			case e, ok := <-input:
+				if !ok {
+					// input channel was closed
+					flushEvents()
+					return
 				}
+				seen[e] = time.Now()
+				t.Reset(delay)
 			}
 		}
-	}
-}
-
-type rebuildServices map[string]utils.Set[string]
-
-func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent, fn func(services rebuildServices)) {
-	services := make(rebuildServices)
-	t := clock.NewTimer(delay)
-	defer t.Stop()
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case <-t.Chan():
-			if len(services) > 0 {
-				go fn(services)
-				services = make(rebuildServices)
-			}
-		case e := <-input:
-			t.Reset(delay)
-			svc, ok := services[e.Service]
-			if !ok {
-				svc = make(utils.Set[string])
-				services[e.Service] = svc
-			}
-			svc.Add(e.HostPath)
-		}
-	}
+	}()
+	return out
 }
 
 func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
@@ -440,8 +421,85 @@ func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []str
 	if err != nil {
 		return err
 	}
+	if execResult.Running {
+		return errors.New("process still running")
+	}
 	if execResult.ExitCode != 0 {
 		return fmt.Errorf("exit code %d", execResult.ExitCode)
 	}
 	return nil
 }
+
+func (s *composeService) handleWatchBatch(
+	ctx context.Context,
+	project *types.Project,
+	serviceName string,
+	batch []fileEvent,
+	syncer sync.Syncer,
+) error {
+	pathMappings := make([]sync.PathMapping, len(batch))
+	for i := range batch {
+		if batch[i].Action == WatchActionRebuild {
+			fmt.Fprintf(
+				s.stdinfo(),
+				"Rebuilding %s after changes were detected:%s\n",
+				serviceName,
+				strings.Join(append([]string{""}, batch[i].HostPath), "\n  - "),
+			)
+			err := s.Up(ctx, project, api.UpOptions{
+				Create: api.CreateOptions{
+					Services: []string{serviceName},
+					Inherit:  true,
+				},
+				Start: api.StartOptions{
+					Services: []string{serviceName},
+					Project:  project,
+				},
+			})
+			if err != nil {
+				fmt.Fprintf(s.stderr(), "Application failed to start after update\n")
+			}
+			return nil
+		}
+		pathMappings[i] = batch[i].PathMapping
+	}
+
+	writeWatchSyncMessage(s.stdinfo(), serviceName, pathMappings)
+
+	service, err := project.GetService(serviceName)
+	if err != nil {
+		return err
+	}
+	if err := syncer.Sync(ctx, service, pathMappings); err != nil {
+		return err
+	}
+	return nil
+}
+
+// writeWatchSyncMessage prints out a message about the sync for the changed paths.
+func writeWatchSyncMessage(w io.Writer, serviceName string, pathMappings []sync.PathMapping) {
+	const maxPathsToShow = 10
+	if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) {
+		hostPathsToSync := make([]string, len(pathMappings))
+		for i := range pathMappings {
+			hostPathsToSync[i] = pathMappings[i].HostPath
+		}
+		fmt.Fprintf(
+			w,
+			"Syncing %s after changes were detected:%s\n",
+			serviceName,
+			strings.Join(append([]string{""}, hostPathsToSync...), "\n  - "),
+		)
+	} else {
+		hostPathsToSync := make([]string, len(pathMappings))
+		for i := range pathMappings {
+			hostPathsToSync[i] = pathMappings[i].HostPath
+		}
+		fmt.Fprintf(
+			w,
+			"Syncing %s after %d changes were detected\n",
+			serviceName,
+			len(pathMappings),
+		)
+	}
+}

+ 134 - 88
pkg/compose/watch_test.go

@@ -16,47 +16,60 @@ package compose
 
 import (
 	"context"
+	"os"
 	"testing"
 	"time"
 
+	"github.com/compose-spec/compose-go/types"
+	"github.com/docker/compose/v2/pkg/mocks"
+	moby "github.com/docker/docker/api/types"
+	"github.com/golang/mock/gomock"
+
+	"github.com/jonboulle/clockwork"
+	"github.com/stretchr/testify/require"
+
 	"github.com/docker/compose/v2/internal/sync"
 
-	"github.com/docker/cli/cli/command"
 	"github.com/docker/compose/v2/pkg/watch"
-	"github.com/jonboulle/clockwork"
-	"golang.org/x/sync/errgroup"
 	"gotest.tools/v3/assert"
 )
 
-func Test_debounce(t *testing.T) {
+func TestDebounceBatching(t *testing.T) {
 	ch := make(chan fileEvent)
-	var (
-		ran int
-		got []string
-	)
 	clock := clockwork.NewFakeClock()
 	ctx, stop := context.WithCancel(context.Background())
 	t.Cleanup(stop)
-	eg, ctx := errgroup.WithContext(ctx)
-	eg.Go(func() error {
-		debounce(ctx, clock, quietPeriod, ch, func(services rebuildServices) {
-			for svc := range services {
-				got = append(got, svc)
-			}
-			ran++
-			stop()
-		})
-		return nil
-	})
+
+	eventBatchCh := batchDebounceEvents(ctx, clock, quietPeriod, ch)
 	for i := 0; i < 100; i++ {
-		ch <- fileEvent{Service: "test"}
+		var action WatchAction = "a"
+		if i%2 == 0 {
+			action = "b"
+		}
+		ch <- fileEvent{Action: action}
 	}
-	assert.Equal(t, ran, 0)
+	// we sent 100 events + the debouncer
+	clock.BlockUntil(101)
 	clock.Advance(quietPeriod)
-	err := eg.Wait()
-	assert.NilError(t, err)
-	assert.Equal(t, ran, 1)
-	assert.DeepEqual(t, got, []string{"test"})
+	select {
+	case batch := <-eventBatchCh:
+		require.ElementsMatch(t, batch, []fileEvent{
+			{Action: "a"},
+			{Action: "b"},
+		})
+	case <-time.After(50 * time.Millisecond):
+		t.Fatal("timed out waiting for events")
+	}
+	clock.BlockUntil(1)
+	clock.Advance(quietPeriod)
+
+	// there should only be a single batch
+	select {
+	case batch := <-eventBatchCh:
+		t.Fatalf("unexpected events: %v", batch)
+	case <-time.After(50 * time.Millisecond):
+		// channel is empty
+	}
 }
 
 type testWatcher struct {
@@ -80,73 +93,106 @@ func (t testWatcher) Errors() chan error {
 	return t.errors
 }
 
-func Test_sync(t *testing.T) {
-	needSync := make(chan sync.PathMapping)
-	needRebuild := make(chan fileEvent)
-	ctx, cancelFunc := context.WithCancel(context.TODO())
-	defer cancelFunc()
+func TestWatch_Sync(t *testing.T) {
+	mockCtrl := gomock.NewController(t)
+	cli := mocks.NewMockCli(mockCtrl)
+	cli.EXPECT().Err().Return(os.Stderr).AnyTimes()
+	apiClient := mocks.NewMockAPIClient(mockCtrl)
+	apiClient.EXPECT().ContainerList(gomock.Any(), gomock.Any()).Return([]moby.Container{
+		testContainer("test", "123", false),
+	}, nil).AnyTimes()
+	cli.EXPECT().Client().Return(apiClient).AnyTimes()
+
+	ctx, cancelFunc := context.WithCancel(context.Background())
+	t.Cleanup(cancelFunc)
+
+	proj := types.Project{
+		Services: []types.ServiceConfig{
+			{
+				Name: "test",
+			},
+		},
+	}
+
+	watcher := testWatcher{
+		events: make(chan watch.FileEvent),
+		errors: make(chan error),
+	}
 
-	run := func() watch.Notify {
-		watcher := testWatcher{
-			events: make(chan watch.FileEvent, 1),
-			errors: make(chan error),
+	syncer := newFakeSyncer()
+	clock := clockwork.NewFakeClock()
+	go func() {
+		service := composeService{
+			dockerCli: cli,
+			clock:     clock,
 		}
+		err := service.watch(ctx, &proj, "test", watcher, syncer, []Trigger{
+			{
+				Path:   "/sync",
+				Action: "sync",
+				Target: "/work",
+				Ignore: []string{"ignore"},
+			},
+			{
+				Path:   "/rebuild",
+				Action: "rebuild",
+			},
+		})
+		assert.NilError(t, err)
+	}()
 
-		go func() {
-			cli, err := command.NewDockerCli()
-			assert.NilError(t, err)
-
-			service := composeService{
-				dockerCli: cli,
-			}
-			err = service.watch(ctx, "test", watcher, []Trigger{
-				{
-					Path:   "/src",
-					Action: "sync",
-					Target: "/work",
-					Ignore: []string{"ignore"},
-				},
-				{
-					Path:   "/",
-					Action: "rebuild",
-				},
-			}, needSync, needRebuild)
-			assert.NilError(t, err)
-		}()
-		return watcher
+	watcher.Events() <- watch.NewFileEvent("/sync/changed")
+	watcher.Events() <- watch.NewFileEvent("/sync/changed/sub")
+	clock.BlockUntil(3)
+	clock.Advance(quietPeriod)
+	select {
+	case actual := <-syncer.synced:
+		require.ElementsMatch(t, []sync.PathMapping{
+			{HostPath: "/sync/changed", ContainerPath: "/work/changed"},
+			{HostPath: "/sync/changed/sub", ContainerPath: "/work/changed/sub"},
+		}, actual)
+	case <-time.After(100 * time.Millisecond):
+		t.Error("timeout")
 	}
 
-	t.Run("synchronize file", func(t *testing.T) {
-		watcher := run()
-		watcher.Events() <- watch.NewFileEvent("/src/changed")
-		select {
-		case actual := <-needSync:
-			assert.DeepEqual(t, sync.PathMapping{Service: "test", HostPath: "/src/changed", ContainerPath: "/work/changed"}, actual)
-		case <-time.After(100 * time.Millisecond):
-			t.Error("timeout")
-		}
-	})
-
-	t.Run("ignore", func(t *testing.T) {
-		watcher := run()
-		watcher.Events() <- watch.NewFileEvent("/src/ignore")
-		select {
-		case <-needSync:
-			t.Error("file event should have been ignored")
-		case <-time.After(100 * time.Millisecond):
-			// expected
-		}
-	})
-
-	t.Run("rebuild", func(t *testing.T) {
-		watcher := run()
-		watcher.Events() <- watch.NewFileEvent("/dependencies.yaml")
-		select {
-		case event := <-needRebuild:
-			assert.Equal(t, "test", event.Service)
-		case <-time.After(100 * time.Millisecond):
-			t.Error("timeout")
-		}
-	})
+	watcher.Events() <- watch.NewFileEvent("/sync/ignore")
+	watcher.Events() <- watch.NewFileEvent("/sync/ignore/sub")
+	watcher.Events() <- watch.NewFileEvent("/sync/changed")
+	clock.BlockUntil(4)
+	clock.Advance(quietPeriod)
+	select {
+	case actual := <-syncer.synced:
+		require.ElementsMatch(t, []sync.PathMapping{
+			{HostPath: "/sync/changed", ContainerPath: "/work/changed"},
+		}, actual)
+	case <-time.After(100 * time.Millisecond):
+		t.Error("timed out waiting for events")
+	}
+
+	watcher.Events() <- watch.NewFileEvent("/rebuild")
+	watcher.Events() <- watch.NewFileEvent("/sync/changed")
+	clock.BlockUntil(4)
+	clock.Advance(quietPeriod)
+	select {
+	case batch := <-syncer.synced:
+		t.Fatalf("received unexpected events: %v", batch)
+	case <-time.After(100 * time.Millisecond):
+		// expected
+	}
+	// TODO: there's not a great way to assert that the rebuild attempt happened
+}
+
+type fakeSyncer struct {
+	synced chan []sync.PathMapping
+}
 
+func newFakeSyncer() *fakeSyncer {
+	return &fakeSyncer{
+		synced: make(chan []sync.PathMapping),
+	}
+}
+
+func (f *fakeSyncer) Sync(_ context.Context, _ types.ServiceConfig, paths []sync.PathMapping) error {
+	f.synced <- paths
+	return nil
 }

+ 2 - 1
pkg/e2e/watch_test.go

@@ -23,6 +23,7 @@ import (
 	"strings"
 	"sync/atomic"
 	"testing"
+	"time"
 
 	"github.com/distribution/distribution/v3/uuid"
 	"github.com/stretchr/testify/require"
@@ -132,7 +133,7 @@ func doTest(t *testing.T, svcName string, tarSync bool) {
 	poll.WaitOn(t, func(t poll.LogT) poll.Result {
 		writeDataFile("hello.txt", "hello world")
 		return checkFileContents("/app/data/hello.txt", "hello world")(t)
-	})
+	}, poll.WithDelay(time.Second))
 
 	t.Logf("Modifying file contents")
 	writeDataFile("hello.txt", "hello watch")