Browse Source

introduce sync+exec watch action

Signed-off-by: Nicolas De Loof <[email protected]>
Nicolas De Loof 1 year ago
parent
commit
32a22c1f4f
6 changed files with 143 additions and 62 deletions
  1. 2 0
      go.mod
  2. 2 2
      go.sum
  3. 82 57
      pkg/compose/watch.go
  4. 3 3
      pkg/compose/watch_test.go
  5. 14 0
      pkg/e2e/fixtures/watch/exec.yaml
  6. 40 0
      pkg/e2e/watch_test.go

+ 2 - 0
go.mod

@@ -195,3 +195,5 @@ require (
 	sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
 	sigs.k8s.io/yaml v1.3.0 // indirect
 )
+
+replace github.com/compose-spec/compose-go/v2 => github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab

+ 2 - 2
go.sum

@@ -85,8 +85,6 @@ github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78 h1:QVw89YDxXxEe+l8gU8E
 github.com/cncf/xds/go v0.0.0-20240905190251-b4127c9b8d78/go.mod h1:W+zGtBO5Y1IgJhy4+A9GOqVhqLpfZi+vwmdNXUehLA8=
 github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb h1:EDmT6Q9Zs+SbUoc7Ik9EfrFqcylYqgPZ9ANSbTAntnE=
 github.com/codahale/rfc6979 v0.0.0-20141003034818-6a90f24967eb/go.mod h1:ZjrT6AXHbDs86ZSdt/osfBi5qfexBrKUdONk989Wnk4=
-github.com/compose-spec/compose-go/v2 v2.4.5 h1:p4ih4Jb6VgGPLPxh3fSFVKAjFHtZd+7HVLCSFzcFx9Y=
-github.com/compose-spec/compose-go/v2 v2.4.5/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
 github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM=
 github.com/containerd/cgroups/v3 v3.0.3 h1:S5ByHZ/h9PMe5IOQoN7E+nMc2UcLEM/V48DGDJ9kip0=
 github.com/containerd/cgroups/v3 v3.0.3/go.mod h1:8HBe7V3aWGLFPd/k03swSIsGjZhHI2WzJmticMgVuz0=
@@ -357,6 +355,8 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8m
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
 github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f h1:y5//uYreIhSUg3J1GEMiLbxo1LJaP8RfCpH6pymGZus=
 github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw=
+github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab h1:3Q4/1sAnPv4nMpak/lIzWsQJjX8X5zKZRkDd6mlf2mc=
+github.com/ndeloof/compose-go/v2 v2.0.1-0.20241127110655-b1321070b3ab/go.mod h1:lFN0DrMxIncJGYAXTfWuajfwj5haBJqrBkarHcnjJKc=
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/ginkgo v1.12.0 h1:Iw5WCbBcaAAd0fpRb1c9r5YCylv4XDoCSigm1zLevwU=

+ 82 - 57
pkg/compose/watch.go

@@ -23,12 +23,12 @@ import (
 	"os"
 	"path"
 	"path/filepath"
-	"sort"
 	"strconv"
 	"strings"
 	"time"
 
 	"github.com/compose-spec/compose-go/v2/types"
+	ccli "github.com/docker/cli/cli/command/container"
 	pathutil "github.com/docker/compose/v2/internal/paths"
 	"github.com/docker/compose/v2/internal/sync"
 	"github.com/docker/compose/v2/pkg/api"
@@ -48,7 +48,7 @@ const quietPeriod = 500 * time.Millisecond
 // fileEvent contains the Compose service and modified host system path.
 type fileEvent struct {
 	sync.PathMapping
-	Action types.WatchAction
+	Trigger types.Trigger
 }
 
 // getSyncImplementation returns an appropriate sync implementation for the
@@ -287,7 +287,7 @@ func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMat
 	}
 
 	return &fileEvent{
-		Action: trigger.Action,
+		Trigger: trigger,
 		PathMapping: sync.PathMapping{
 			HostPath:      hostPath,
 			ContainerPath: containerPath,
@@ -325,7 +325,10 @@ func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project)
 		}
 
 		if trigger.Action == types.WatchActionRebuild && service.Build == nil {
-			return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
+			return nil, fmt.Errorf("service %s doesn't have a build section, can't apply %s on watch", types.WatchActionRebuild, service.Name)
+		}
+		if trigger.Action == types.WatchActionSyncExec && len(trigger.Exec.Command) == 0 {
+			return nil, fmt.Errorf("can't watch with action %q on service %s wihtout a command", types.WatchActionSyncExec, service.Name)
 		}
 
 		config.Watch[i] = trigger
@@ -341,24 +344,17 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
 	out := make(chan []fileEvent)
 	go func() {
 		defer close(out)
-		seen := make(map[fileEvent]time.Time)
+		seen := make(map[sync.PathMapping]fileEvent)
 		flushEvents := func() {
 			if len(seen) == 0 {
 				return
 			}
 			events := make([]fileEvent, 0, len(seen))
-			for e := range seen {
+			for _, e := range seen {
 				events = append(events, e)
 			}
-			// sort batch by oldest -> newest
-			// (if an event is seen > 1 per batch, it gets the latest timestamp)
-			sort.SliceStable(events, func(i, j int) bool {
-				x := events[i]
-				y := events[j]
-				return seen[x].Before(seen[y])
-			})
 			out <- events
-			seen = make(map[fileEvent]time.Time)
+			seen = make(map[sync.PathMapping]fileEvent)
 		}
 
 		t := clock.NewTicker(delay)
@@ -375,7 +371,7 @@ func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.
 					flushEvents()
 					return
 				}
-				seen[e] = time.Now()
+				seen[e.PathMapping] = e
 				t.Reset(delay)
 			}
 		}
@@ -474,49 +470,10 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
 	pathMappings := make([]sync.PathMapping, len(batch))
 	restartService := false
 	for i := range batch {
-		if batch[i].Action == types.WatchActionRebuild {
-			options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
-			// restrict the build to ONLY this service, not any of its dependencies
-			options.Build.Services = []string{serviceName}
-			imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)
-
-			if err != nil {
-				options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
-				return err
-			}
-
-			if options.Prune {
-				s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
-			}
-
-			options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))
-
-			err = s.create(ctx, project, api.CreateOptions{
-				Services: []string{serviceName},
-				Inherit:  true,
-				Recreate: api.RecreateForce,
-			})
-			if err != nil {
-				options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
-				return err
-			}
-
-			services := []string{serviceName}
-			p, err := project.WithSelectedServices(services)
-			if err != nil {
-				return err
-			}
-			err = s.start(ctx, project.Name, api.StartOptions{
-				Project:  p,
-				Services: services,
-				AttachTo: services,
-			}, nil)
-			if err != nil {
-				options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
-			}
-			return nil
+		if batch[i].Trigger.Action == types.WatchActionRebuild {
+			return s.rebuild(ctx, project, serviceName, options)
 		}
-		if batch[i].Action == types.WatchActionSyncRestart {
+		if batch[i].Trigger.Action == types.WatchActionSyncRestart {
 			restartService = true
 		}
 		pathMappings[i] = batch[i].PathMapping
@@ -543,7 +500,75 @@ func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Pr
 		options.LogTo.Log(
 			api.WatchLogger,
 			fmt.Sprintf("service %q restarted", serviceName))
+	}
+	eg, ctx := errgroup.WithContext(ctx)
+	for _, b := range batch {
+		if b.Trigger.Action == types.WatchActionSyncExec {
+			containers, err := s.getContainers(ctx, project.Name, oneOffExclude, false, serviceName)
+			if err != nil {
+				return err
+			}
+			x := b.Trigger.Exec
+			for _, c := range containers {
+				eg.Go(func() error {
+					exec := ccli.NewExecOptions()
+					exec.User = x.User
+					exec.Privileged = x.Privileged
+					exec.Command = x.Command
+					exec.Workdir = x.WorkingDir
+					for _, v := range x.Environment.ToMapping().Values() {
+						err := exec.Env.Set(v)
+						if err != nil {
+							return err
+						}
+					}
+					return ccli.RunExec(ctx, s.dockerCli, c.ID, exec)
+				})
+			}
+		}
+	}
+	return eg.Wait()
+}
+
+func (s *composeService) rebuild(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions) error {
+	options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
+	// restrict the build to ONLY this service, not any of its dependencies
+	options.Build.Services = []string{serviceName}
+	imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)
+
+	if err != nil {
+		options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
+		return err
+	}
 
+	if options.Prune {
+		s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
+	}
+
+	options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))
+
+	err = s.create(ctx, project, api.CreateOptions{
+		Services: []string{serviceName},
+		Inherit:  true,
+		Recreate: api.RecreateForce,
+	})
+	if err != nil {
+		options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
+		return err
+	}
+
+	services := []string{serviceName}
+	p, err := project.WithSelectedServices(services)
+	if err != nil {
+		return err
+	}
+	err = s.start(ctx, project.Name, api.StartOptions{
+		Project:  p,
+		Services: services,
+		AttachTo: services,
+	}, nil)
+	if err != nil {
+		options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
 	}
 	return nil
 }

+ 3 - 3
pkg/compose/watch_test.go

@@ -48,7 +48,7 @@ func TestDebounceBatching(t *testing.T) {
 		if i%2 == 0 {
 			action = "b"
 		}
-		ch <- fileEvent{Action: action}
+		ch <- fileEvent{Trigger: types.Trigger{Action: action}}
 	}
 	// we sent 100 events + the debouncer
 	clock.BlockUntil(101)
@@ -56,8 +56,8 @@ func TestDebounceBatching(t *testing.T) {
 	select {
 	case batch := <-eventBatchCh:
 		require.ElementsMatch(t, batch, []fileEvent{
-			{Action: "a"},
-			{Action: "b"},
+			{Trigger: types.Trigger{Action: "a"}},
+			{Trigger: types.Trigger{Action: "b"}},
 		})
 	case <-time.After(50 * time.Millisecond):
 		t.Fatal("timed out waiting for events")

+ 14 - 0
pkg/e2e/fixtures/watch/exec.yaml

@@ -0,0 +1,14 @@
+services:
+  test:
+    build:
+      dockerfile_inline: FROM alpine
+    command: ping localhost
+    volumes:
+      - /data
+    develop:
+      watch:
+        - path: .
+          target: /data
+          action: sync+exec
+          exec:
+            command: echo "SUCCESS"

+ 40 - 0
pkg/e2e/watch_test.go

@@ -17,6 +17,7 @@
 package e2e
 
 import (
+	"bytes"
 	"crypto/rand"
 	"fmt"
 	"os"
@@ -289,3 +290,42 @@ func doTest(t *testing.T, svcName string) {
 
 	testComplete.Store(true)
 }
+
+func TestWatchExec(t *testing.T) {
+	cli := NewCLI(t)
+	const projectName = "test_watch_exec"
+
+	t.Cleanup(func() {
+		cli.RunDockerComposeCmd(t, "-p", projectName, "down")
+	})
+
+	tmpdir := t.TempDir()
+	composeFilePath := filepath.Join(tmpdir, "compose.yaml")
+	CopyFile(t, filepath.Join("fixtures", "watch", "exec.yaml"), composeFilePath)
+	cmd := cli.NewDockerComposeCmd(t, "-p", projectName, "-f", composeFilePath, "up", "--watch")
+	buffer := bytes.NewBuffer(nil)
+	cmd.Stdout = buffer
+	watch := icmd.StartCmd(cmd)
+
+	poll.WaitOn(t, func(l poll.LogT) poll.Result {
+		out := buffer.String()
+		if strings.Contains(out, "64 bytes from") {
+			return poll.Success()
+		}
+		return poll.Continue("%v", watch.Stdout())
+	})
+
+	t.Logf("Create new file")
+
+	testFile := filepath.Join(tmpdir, "test")
+	require.NoError(t, os.WriteFile(testFile, []byte("test\n"), 0o600))
+
+	poll.WaitOn(t, func(l poll.LogT) poll.Result {
+		out := buffer.String()
+		if strings.Contains(out, "SUCCESS") {
+			return poll.Success()
+		}
+		return poll.Continue("%v", out)
+	})
+	cli.RunDockerComposeCmdNoCheck(t, "-p", projectName, "kill", "-s", "9")
+}