watch.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/compose-spec/compose-go/v2/types"
  27. pathutil "github.com/docker/compose/v2/internal/paths"
  28. "github.com/docker/compose/v2/internal/sync"
  29. "github.com/docker/compose/v2/pkg/api"
  30. "github.com/docker/compose/v2/pkg/watch"
  31. moby "github.com/docker/docker/api/types"
  32. "github.com/jonboulle/clockwork"
  33. "github.com/mitchellh/mapstructure"
  34. "github.com/sirupsen/logrus"
  35. "golang.org/x/sync/errgroup"
  36. )
  37. const quietPeriod = 500 * time.Millisecond
  38. // fileEvent contains the Compose service and modified host system path.
  39. type fileEvent struct {
  40. sync.PathMapping
  41. Action types.WatchAction
  42. }
  43. // getSyncImplementation returns an appropriate sync implementation for the
  44. // project.
  45. //
  46. // Currently, an implementation that batches files and transfers them using
  47. // the Moby `Untar` API.
  48. func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) {
  49. var useTar bool
  50. if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
  51. useTar, _ = strconv.ParseBool(useTarEnv)
  52. } else {
  53. useTar = true
  54. }
  55. if !useTar {
  56. return nil, errors.New("no available sync implementation")
  57. }
  58. return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
  59. }
  60. func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
  61. var err error
  62. if project, err = project.WithSelectedServices(services); err != nil {
  63. return err
  64. }
  65. syncer, err := s.getSyncImplementation(project)
  66. if err != nil {
  67. return err
  68. }
  69. eg, ctx := errgroup.WithContext(ctx)
  70. watching := false
  71. options.LogTo.Register(api.WatchLogger)
  72. for i := range project.Services {
  73. service := project.Services[i]
  74. config, err := loadDevelopmentConfig(service, project)
  75. if err != nil {
  76. return err
  77. }
  78. if service.Develop != nil {
  79. config = service.Develop
  80. }
  81. if config == nil {
  82. continue
  83. }
  84. for _, trigger := range config.Watch {
  85. if trigger.Action == types.WatchActionRebuild {
  86. if service.Build == nil {
  87. return fmt.Errorf("can't watch service %q with action %s without a build context", service.Name, types.WatchActionRebuild)
  88. }
  89. if options.Build == nil {
  90. return fmt.Errorf("--no-build is incompatible with watch action %s in service %s", types.WatchActionRebuild, service.Name)
  91. }
  92. }
  93. }
  94. if len(services) > 0 && service.Build == nil {
  95. // service explicitly selected for watch has no build section
  96. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  97. }
  98. if len(services) == 0 && service.Build == nil {
  99. continue
  100. }
  101. // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
  102. service.PullPolicy = types.PullPolicyBuild
  103. project.Services[i] = service
  104. dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
  105. if err != nil {
  106. return err
  107. }
  108. // add a hardcoded set of ignores on top of what came from .dockerignore
  109. // some of this should likely be configurable (e.g. there could be cases
  110. // where you want `.git` to be synced) but this is suitable for now
  111. dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
  112. if err != nil {
  113. return err
  114. }
  115. ignore := watch.NewCompositeMatcher(
  116. dockerIgnores,
  117. watch.EphemeralPathMatcher(),
  118. dotGitIgnore,
  119. )
  120. var paths, pathLogs []string
  121. for _, trigger := range config.Watch {
  122. if checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
  123. logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
  124. continue
  125. }
  126. paths = append(paths, trigger.Path)
  127. pathLogs = append(pathLogs, fmt.Sprintf("Action %s for path %q", trigger.Action, trigger.Path))
  128. }
  129. watcher, err := watch.NewWatcher(paths, ignore)
  130. if err != nil {
  131. return err
  132. }
  133. logrus.Debugf("Watch configuration for service %q:%s\n",
  134. service.Name,
  135. strings.Join(append([]string{""}, pathLogs...), "\n - "),
  136. )
  137. err = watcher.Start()
  138. if err != nil {
  139. return err
  140. }
  141. watching = true
  142. eg.Go(func() error {
  143. defer watcher.Close() //nolint:errcheck
  144. return s.watch(ctx, project, service.Name, options, watcher, syncer, config.Watch)
  145. })
  146. }
  147. if !watching {
  148. return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section")
  149. }
  150. options.LogTo.Log(api.WatchLogger, "watch enabled")
  151. return eg.Wait()
  152. }
  153. func (s *composeService) watch(ctx context.Context, project *types.Project, name string, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, triggers []types.Trigger) error {
  154. ctx, cancel := context.WithCancel(ctx)
  155. defer cancel()
  156. ignores := make([]watch.PathMatcher, len(triggers))
  157. for i, trigger := range triggers {
  158. ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
  159. if err != nil {
  160. return err
  161. }
  162. ignores[i] = ignore
  163. }
  164. events := make(chan fileEvent)
  165. batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
  166. go func() {
  167. for {
  168. select {
  169. case <-ctx.Done():
  170. return
  171. case batch := <-batchEvents:
  172. start := time.Now()
  173. logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
  174. if err := s.handleWatchBatch(ctx, project, name, options, batch, syncer); err != nil {
  175. logrus.Warnf("Error handling changed files for service %s: %v", name, err)
  176. }
  177. logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
  178. name, time.Since(start), len(batch))
  179. }
  180. }
  181. }()
  182. for {
  183. select {
  184. case <-ctx.Done():
  185. return nil
  186. case err := <-watcher.Errors():
  187. return err
  188. case event := <-watcher.Events():
  189. hostPath := event.Path()
  190. for i, trigger := range triggers {
  191. logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
  192. if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil {
  193. events <- *fileEvent
  194. }
  195. }
  196. }
  197. }
  198. }
  199. // maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
  200. // rules.
  201. //
  202. // Any errors are logged as warnings and nil (no file event) is returned.
  203. func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
  204. if !pathutil.IsChild(trigger.Path, hostPath) {
  205. return nil
  206. }
  207. isIgnored, err := ignore.Matches(hostPath)
  208. if err != nil {
  209. logrus.Warnf("error ignore matching %q: %v", hostPath, err)
  210. return nil
  211. }
  212. if isIgnored {
  213. logrus.Debugf("%s is matching ignore pattern", hostPath)
  214. return nil
  215. }
  216. var containerPath string
  217. if trigger.Target != "" {
  218. rel, err := filepath.Rel(trigger.Path, hostPath)
  219. if err != nil {
  220. logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
  221. return nil
  222. }
  223. // always use Unix-style paths for inside the container
  224. containerPath = path.Join(trigger.Target, rel)
  225. }
  226. return &fileEvent{
  227. Action: trigger.Action,
  228. PathMapping: sync.PathMapping{
  229. HostPath: hostPath,
  230. ContainerPath: containerPath,
  231. },
  232. }
  233. }
  234. func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) {
  235. var config types.DevelopConfig
  236. y, ok := service.Extensions["x-develop"]
  237. if !ok {
  238. return nil, nil
  239. }
  240. logrus.Warnf("x-develop is DEPRECATED, please use the official `develop` attribute")
  241. err := mapstructure.Decode(y, &config)
  242. if err != nil {
  243. return nil, err
  244. }
  245. baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
  246. if err != nil {
  247. return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
  248. }
  249. for i, trigger := range config.Watch {
  250. if !filepath.IsAbs(trigger.Path) {
  251. trigger.Path = filepath.Join(baseDir, trigger.Path)
  252. }
  253. if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
  254. // this might fail because the path doesn't exist, etc.
  255. trigger.Path = p
  256. }
  257. trigger.Path = filepath.Clean(trigger.Path)
  258. if trigger.Path == "" {
  259. return nil, errors.New("watch rules MUST define a path")
  260. }
  261. if trigger.Action == types.WatchActionRebuild && service.Build == nil {
  262. return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
  263. }
  264. config.Watch[i] = trigger
  265. }
  266. return &config, nil
  267. }
  268. // batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
  269. // channel.
  270. //
  271. // The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
  272. func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
  273. out := make(chan []fileEvent)
  274. go func() {
  275. defer close(out)
  276. seen := make(map[fileEvent]time.Time)
  277. flushEvents := func() {
  278. if len(seen) == 0 {
  279. return
  280. }
  281. events := make([]fileEvent, 0, len(seen))
  282. for e := range seen {
  283. events = append(events, e)
  284. }
  285. // sort batch by oldest -> newest
  286. // (if an event is seen > 1 per batch, it gets the latest timestamp)
  287. sort.SliceStable(events, func(i, j int) bool {
  288. x := events[i]
  289. y := events[j]
  290. return seen[x].Before(seen[y])
  291. })
  292. out <- events
  293. seen = make(map[fileEvent]time.Time)
  294. }
  295. t := clock.NewTicker(delay)
  296. defer t.Stop()
  297. for {
  298. select {
  299. case <-ctx.Done():
  300. return
  301. case <-t.Chan():
  302. flushEvents()
  303. case e, ok := <-input:
  304. if !ok {
  305. // input channel was closed
  306. flushEvents()
  307. return
  308. }
  309. seen[e] = time.Now()
  310. t.Reset(delay)
  311. }
  312. }
  313. }()
  314. return out
  315. }
  316. func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
  317. for _, volume := range volumes {
  318. if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
  319. return true
  320. }
  321. }
  322. return false
  323. }
  324. type tarDockerClient struct {
  325. s *composeService
  326. }
  327. func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
  328. containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
  329. if err != nil {
  330. return nil, err
  331. }
  332. return containers, nil
  333. }
  334. func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
  335. execCfg := moby.ExecConfig{
  336. Cmd: cmd,
  337. AttachStdout: false,
  338. AttachStderr: true,
  339. AttachStdin: in != nil,
  340. Tty: false,
  341. }
  342. execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
  343. if err != nil {
  344. return err
  345. }
  346. startCheck := moby.ExecStartCheck{Tty: false, Detach: false}
  347. conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
  348. if err != nil {
  349. return err
  350. }
  351. defer conn.Close()
  352. var eg errgroup.Group
  353. if in != nil {
  354. eg.Go(func() error {
  355. defer func() {
  356. _ = conn.CloseWrite()
  357. }()
  358. _, err := io.Copy(conn.Conn, in)
  359. return err
  360. })
  361. }
  362. eg.Go(func() error {
  363. _, err := io.Copy(t.s.stdinfo(), conn.Reader)
  364. return err
  365. })
  366. err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
  367. if err != nil {
  368. return err
  369. }
  370. // although the errgroup is not tied directly to the context, the operations
  371. // in it are reading/writing to the connection, which is tied to the context,
  372. // so they won't block indefinitely
  373. if err := eg.Wait(); err != nil {
  374. return err
  375. }
  376. execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
  377. if err != nil {
  378. return err
  379. }
  380. if execResult.Running {
  381. return errors.New("process still running")
  382. }
  383. if execResult.ExitCode != 0 {
  384. return fmt.Errorf("exit code %d", execResult.ExitCode)
  385. }
  386. return nil
  387. }
  388. func (t tarDockerClient) Untar(ctx context.Context, id string, archive io.ReadCloser) error {
  389. return t.s.apiClient().CopyToContainer(ctx, id, "/", archive, moby.CopyToContainerOptions{
  390. CopyUIDGID: true,
  391. })
  392. }
  393. func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions, batch []fileEvent, syncer sync.Syncer) error {
  394. pathMappings := make([]sync.PathMapping, len(batch))
  395. restartService := false
  396. for i := range batch {
  397. if batch[i].Action == types.WatchActionRebuild {
  398. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
  399. // restrict the build to ONLY this service, not any of its dependencies
  400. options.Build.Services = []string{serviceName}
  401. _, err := s.build(ctx, project, *options.Build, nil)
  402. if err != nil {
  403. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
  404. return err
  405. }
  406. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))
  407. err = s.create(ctx, project, api.CreateOptions{
  408. Services: []string{serviceName},
  409. Inherit: true,
  410. Recreate: api.RecreateForce,
  411. }, true)
  412. if err != nil {
  413. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
  414. return err
  415. }
  416. err = s.start(ctx, project.Name, api.StartOptions{
  417. Project: project,
  418. Services: []string{serviceName},
  419. }, nil)
  420. if err != nil {
  421. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
  422. }
  423. return nil
  424. }
  425. if batch[i].Action == types.WatchActionSyncRestart {
  426. restartService = true
  427. }
  428. pathMappings[i] = batch[i].PathMapping
  429. }
  430. writeWatchSyncMessage(options.LogTo, serviceName, pathMappings)
  431. service, err := project.GetService(serviceName)
  432. if err != nil {
  433. return err
  434. }
  435. if err := syncer.Sync(ctx, service, pathMappings); err != nil {
  436. return err
  437. }
  438. if restartService {
  439. return s.Restart(ctx, project.Name, api.RestartOptions{
  440. Services: []string{serviceName},
  441. Project: project,
  442. NoDeps: false,
  443. })
  444. }
  445. return nil
  446. }
  447. // writeWatchSyncMessage prints out a message about the sync for the changed paths.
  448. func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings []sync.PathMapping) {
  449. const maxPathsToShow = 10
  450. if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) {
  451. hostPathsToSync := make([]string, len(pathMappings))
  452. for i := range pathMappings {
  453. hostPathsToSync[i] = pathMappings[i].HostPath
  454. }
  455. log.Log(api.WatchLogger, fmt.Sprintf("Syncing %q after changes were detected", serviceName))
  456. } else {
  457. hostPathsToSync := make([]string, len(pathMappings))
  458. for i := range pathMappings {
  459. hostPathsToSync[i] = pathMappings[i].HostPath
  460. }
  461. log.Log(api.WatchLogger, fmt.Sprintf("Syncing service %q after %d changes were detected", serviceName, len(pathMappings)))
  462. }
  463. }