| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514 |
- /*
- Copyright 2020 Docker Compose CLI authors
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package compose
- import (
- "context"
- "errors"
- "fmt"
- "io"
- "os"
- "path"
- "path/filepath"
- "sort"
- "strconv"
- "strings"
- "time"
- "github.com/compose-spec/compose-go/v2/types"
- "github.com/docker/compose/v2/internal/sync"
- "github.com/docker/compose/v2/pkg/api"
- "github.com/docker/compose/v2/pkg/watch"
- moby "github.com/docker/docker/api/types"
- "github.com/jonboulle/clockwork"
- "github.com/mitchellh/mapstructure"
- "github.com/sirupsen/logrus"
- "golang.org/x/sync/errgroup"
- )
- const quietPeriod = 500 * time.Millisecond
- // fileEvent contains the Compose service and modified host system path.
- type fileEvent struct {
- sync.PathMapping
- Action types.WatchAction
- }
- // getSyncImplementation returns an appropriate sync implementation for the
- // project.
- //
- // Currently, an implementation that batches files and transfers them using
- // the Moby `Untar` API.
- func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) {
- var useTar bool
- if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
- useTar, _ = strconv.ParseBool(useTarEnv)
- } else {
- useTar = true
- }
- if !useTar {
- return nil, errors.New("no available sync implementation")
- }
- return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
- }
- func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
- var err error
- if project, err = project.WithSelectedServices(services); err != nil {
- return err
- }
- syncer, err := s.getSyncImplementation(project)
- if err != nil {
- return err
- }
- eg, ctx := errgroup.WithContext(ctx)
- watching := false
- for i := range project.Services {
- service := project.Services[i]
- config, err := loadDevelopmentConfig(service, project)
- if err != nil {
- return err
- }
- if service.Develop != nil {
- config = service.Develop
- }
- if config == nil {
- continue
- }
- if len(config.Watch) > 0 && service.Build == nil {
- // service configured with watchers but no build section
- return fmt.Errorf("can't watch service %q without a build context", service.Name)
- }
- if len(services) > 0 && service.Build == nil {
- // service explicitly selected for watch has no build section
- return fmt.Errorf("can't watch service %q without a build context", service.Name)
- }
- if len(services) == 0 && service.Build == nil {
- continue
- }
- // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
- service.PullPolicy = types.PullPolicyBuild
- project.Services[i] = service
- dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
- if err != nil {
- return err
- }
- // add a hardcoded set of ignores on top of what came from .dockerignore
- // some of this should likely be configurable (e.g. there could be cases
- // where you want `.git` to be synced) but this is suitable for now
- dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
- if err != nil {
- return err
- }
- ignore := watch.NewCompositeMatcher(
- dockerIgnores,
- watch.EphemeralPathMatcher(),
- dotGitIgnore,
- )
- var paths, pathLogs []string
- for _, trigger := range config.Watch {
- if checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
- logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
- continue
- }
- paths = append(paths, trigger.Path)
- pathLogs = append(pathLogs, fmt.Sprintf("Action %s for path %q", trigger.Action, trigger.Path))
- }
- watcher, err := watch.NewWatcher(paths, ignore)
- if err != nil {
- return err
- }
- fmt.Fprintf(
- s.stdinfo(),
- "Watch configuration for service %q:%s\n",
- service.Name,
- strings.Join(append([]string{""}, pathLogs...), "\n - "),
- )
- err = watcher.Start()
- if err != nil {
- return err
- }
- watching = true
- eg.Go(func() error {
- defer watcher.Close() //nolint:errcheck
- return s.watch(ctx, project, service.Name, options, watcher, syncer, config.Watch)
- })
- }
- if !watching {
- return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section")
- }
- return eg.Wait()
- }
- func (s *composeService) watch(ctx context.Context, project *types.Project, name string, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, triggers []types.Trigger) error {
- ctx, cancel := context.WithCancel(ctx)
- defer cancel()
- ignores := make([]watch.PathMatcher, len(triggers))
- for i, trigger := range triggers {
- ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
- if err != nil {
- return err
- }
- ignores[i] = ignore
- }
- events := make(chan fileEvent)
- batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
- go func() {
- for {
- select {
- case <-ctx.Done():
- return
- case batch := <-batchEvents:
- start := time.Now()
- logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
- if err := s.handleWatchBatch(ctx, project, name, options.Build, batch, syncer); err != nil {
- logrus.Warnf("Error handling changed files for service %s: %v", name, err)
- }
- logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
- name, time.Since(start), len(batch))
- }
- }
- }()
- for {
- select {
- case <-ctx.Done():
- return nil
- case err := <-watcher.Errors():
- return err
- case event := <-watcher.Events():
- hostPath := event.Path()
- for i, trigger := range triggers {
- logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
- if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil {
- events <- *fileEvent
- }
- }
- }
- }
- }
- // maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
- // rules.
- //
- // Any errors are logged as warnings and nil (no file event) is returned.
- func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
- if !watch.IsChild(trigger.Path, hostPath) {
- return nil
- }
- isIgnored, err := ignore.Matches(hostPath)
- if err != nil {
- logrus.Warnf("error ignore matching %q: %v", hostPath, err)
- return nil
- }
- if isIgnored {
- logrus.Debugf("%s is matching ignore pattern", hostPath)
- return nil
- }
- var containerPath string
- if trigger.Target != "" {
- rel, err := filepath.Rel(trigger.Path, hostPath)
- if err != nil {
- logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
- return nil
- }
- // always use Unix-style paths for inside the container
- containerPath = path.Join(trigger.Target, rel)
- }
- return &fileEvent{
- Action: trigger.Action,
- PathMapping: sync.PathMapping{
- HostPath: hostPath,
- ContainerPath: containerPath,
- },
- }
- }
- func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) {
- var config types.DevelopConfig
- y, ok := service.Extensions["x-develop"]
- if !ok {
- return nil, nil
- }
- logrus.Warnf("x-develop is DEPRECATED, please use the official `develop` attribute")
- err := mapstructure.Decode(y, &config)
- if err != nil {
- return nil, err
- }
- baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
- if err != nil {
- return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
- }
- for i, trigger := range config.Watch {
- if !filepath.IsAbs(trigger.Path) {
- trigger.Path = filepath.Join(baseDir, trigger.Path)
- }
- if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
- // this might fail because the path doesn't exist, etc.
- trigger.Path = p
- }
- trigger.Path = filepath.Clean(trigger.Path)
- if trigger.Path == "" {
- return nil, errors.New("watch rules MUST define a path")
- }
- if trigger.Action == types.WatchActionRebuild && service.Build == nil {
- return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
- }
- config.Watch[i] = trigger
- }
- return &config, nil
- }
- // batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
- // channel.
- //
- // The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
- func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
- out := make(chan []fileEvent)
- go func() {
- defer close(out)
- seen := make(map[fileEvent]time.Time)
- flushEvents := func() {
- if len(seen) == 0 {
- return
- }
- events := make([]fileEvent, 0, len(seen))
- for e := range seen {
- events = append(events, e)
- }
- // sort batch by oldest -> newest
- // (if an event is seen > 1 per batch, it gets the latest timestamp)
- sort.SliceStable(events, func(i, j int) bool {
- x := events[i]
- y := events[j]
- return seen[x].Before(seen[y])
- })
- out <- events
- seen = make(map[fileEvent]time.Time)
- }
- t := clock.NewTicker(delay)
- defer t.Stop()
- for {
- select {
- case <-ctx.Done():
- return
- case <-t.Chan():
- flushEvents()
- case e, ok := <-input:
- if !ok {
- // input channel was closed
- flushEvents()
- return
- }
- seen[e] = time.Now()
- t.Reset(delay)
- }
- }
- }()
- return out
- }
- func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
- for _, volume := range volumes {
- if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
- return true
- }
- }
- return false
- }
- type tarDockerClient struct {
- s *composeService
- }
- func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
- containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
- if err != nil {
- return nil, err
- }
- return containers, nil
- }
- func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
- execCfg := moby.ExecConfig{
- Cmd: cmd,
- AttachStdout: false,
- AttachStderr: true,
- AttachStdin: in != nil,
- Tty: false,
- }
- execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
- if err != nil {
- return err
- }
- startCheck := moby.ExecStartCheck{Tty: false, Detach: false}
- conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
- if err != nil {
- return err
- }
- defer conn.Close()
- var eg errgroup.Group
- if in != nil {
- eg.Go(func() error {
- defer func() {
- _ = conn.CloseWrite()
- }()
- _, err := io.Copy(conn.Conn, in)
- return err
- })
- }
- eg.Go(func() error {
- _, err := io.Copy(t.s.stdinfo(), conn.Reader)
- return err
- })
- err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
- if err != nil {
- return err
- }
- // although the errgroup is not tied directly to the context, the operations
- // in it are reading/writing to the connection, which is tied to the context,
- // so they won't block indefinitely
- if err := eg.Wait(); err != nil {
- return err
- }
- execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
- if err != nil {
- return err
- }
- if execResult.Running {
- return errors.New("process still running")
- }
- if execResult.ExitCode != 0 {
- return fmt.Errorf("exit code %d", execResult.ExitCode)
- }
- return nil
- }
- func (t tarDockerClient) Untar(ctx context.Context, id string, archive io.ReadCloser) error {
- return t.s.apiClient().CopyToContainer(ctx, id, "/", archive, moby.CopyToContainerOptions{
- CopyUIDGID: true,
- })
- }
- func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, serviceName string, build api.BuildOptions, batch []fileEvent, syncer sync.Syncer) error {
- pathMappings := make([]sync.PathMapping, len(batch))
- restartService := false
- for i := range batch {
- if batch[i].Action == types.WatchActionRebuild {
- fmt.Fprintf(
- s.stdinfo(),
- "Rebuilding service %q after changes were detected:%s\n",
- serviceName,
- strings.Join(append([]string{""}, batch[i].HostPath), "\n - "),
- )
- // restrict the build to ONLY this service, not any of its dependencies
- build.Services = []string{serviceName}
- err := s.Up(ctx, project, api.UpOptions{
- Create: api.CreateOptions{
- Build: &build,
- Services: []string{serviceName},
- Inherit: true,
- },
- Start: api.StartOptions{
- Services: []string{serviceName},
- Project: project,
- },
- })
- if err != nil {
- fmt.Fprintf(s.stderr(), "Application failed to start after update. Error: %v\n", err)
- }
- return nil
- }
- if batch[i].Action == types.WatchActionSyncRestart {
- restartService = true
- }
- pathMappings[i] = batch[i].PathMapping
- }
- writeWatchSyncMessage(s.stdinfo(), serviceName, pathMappings)
- service, err := project.GetService(serviceName)
- if err != nil {
- return err
- }
- if err := syncer.Sync(ctx, service, pathMappings); err != nil {
- return err
- }
- if restartService {
- return s.Restart(ctx, project.Name, api.RestartOptions{
- Services: []string{serviceName},
- Project: project,
- NoDeps: false,
- })
- }
- return nil
- }
- // writeWatchSyncMessage prints out a message about the sync for the changed paths.
- func writeWatchSyncMessage(w io.Writer, serviceName string, pathMappings []sync.PathMapping) {
- const maxPathsToShow = 10
- if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) {
- hostPathsToSync := make([]string, len(pathMappings))
- for i := range pathMappings {
- hostPathsToSync[i] = pathMappings[i].HostPath
- }
- fmt.Fprintf(
- w,
- "Syncing %q after changes were detected:%s\n",
- serviceName,
- strings.Join(append([]string{""}, hostPathsToSync...), "\n - "),
- )
- } else {
- hostPathsToSync := make([]string, len(pathMappings))
- for i := range pathMappings {
- hostPathsToSync[i] = pathMappings[i].HostPath
- }
- fmt.Fprintf(
- w,
- "Syncing service %q after %d changes were detected\n",
- serviceName,
- len(pathMappings),
- )
- }
- }
|