watch.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/compose-spec/compose-go/v2/types"
  27. pathutil "github.com/docker/compose/v2/internal/paths"
  28. "github.com/docker/compose/v2/internal/sync"
  29. "github.com/docker/compose/v2/pkg/api"
  30. "github.com/docker/compose/v2/pkg/watch"
  31. moby "github.com/docker/docker/api/types"
  32. "github.com/docker/docker/api/types/container"
  33. "github.com/docker/docker/api/types/filters"
  34. "github.com/docker/docker/api/types/image"
  35. "github.com/jonboulle/clockwork"
  36. "github.com/mitchellh/mapstructure"
  37. "github.com/sirupsen/logrus"
  38. "golang.org/x/sync/errgroup"
  39. )
  40. const quietPeriod = 500 * time.Millisecond
  41. // fileEvent contains the Compose service and modified host system path.
  42. type fileEvent struct {
  43. sync.PathMapping
  44. Action types.WatchAction
  45. }
  46. // getSyncImplementation returns an appropriate sync implementation for the
  47. // project.
  48. //
  49. // Currently, an implementation that batches files and transfers them using
  50. // the Moby `Untar` API.
  51. func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) {
  52. var useTar bool
  53. if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
  54. useTar, _ = strconv.ParseBool(useTarEnv)
  55. } else {
  56. useTar = true
  57. }
  58. if !useTar {
  59. return nil, errors.New("no available sync implementation")
  60. }
  61. return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
  62. }
  63. func (s *composeService) shouldWatch(project *types.Project) bool {
  64. var shouldWatch bool
  65. for i := range project.Services {
  66. service := project.Services[i]
  67. if service.Develop != nil && service.Develop.Watch != nil {
  68. shouldWatch = true
  69. }
  70. }
  71. return shouldWatch
  72. }
  73. func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error {
  74. return s.watch(ctx, nil, project, services, options)
  75. }
  76. func (s *composeService) watch(ctx context.Context, syncChannel chan bool, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
  77. var err error
  78. if project, err = project.WithSelectedServices(services); err != nil {
  79. return err
  80. }
  81. syncer, err := s.getSyncImplementation(project)
  82. if err != nil {
  83. return err
  84. }
  85. eg, ctx := errgroup.WithContext(ctx)
  86. watching := false
  87. options.LogTo.Register(api.WatchLogger)
  88. for i := range project.Services {
  89. service := project.Services[i]
  90. config, err := loadDevelopmentConfig(service, project)
  91. if err != nil {
  92. return err
  93. }
  94. if service.Develop != nil {
  95. config = service.Develop
  96. }
  97. if config == nil {
  98. continue
  99. }
  100. for _, trigger := range config.Watch {
  101. if trigger.Action == types.WatchActionRebuild {
  102. if service.Build == nil {
  103. return fmt.Errorf("can't watch service %q with action %s without a build context", service.Name, types.WatchActionRebuild)
  104. }
  105. if options.Build == nil {
  106. return fmt.Errorf("--no-build is incompatible with watch action %s in service %s", types.WatchActionRebuild, service.Name)
  107. }
  108. }
  109. }
  110. if len(services) > 0 && service.Build == nil {
  111. // service explicitly selected for watch has no build section
  112. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  113. }
  114. if len(services) == 0 && service.Build == nil {
  115. continue
  116. }
  117. // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
  118. service.PullPolicy = types.PullPolicyBuild
  119. project.Services[i] = service
  120. dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
  121. if err != nil {
  122. return err
  123. }
  124. // add a hardcoded set of ignores on top of what came from .dockerignore
  125. // some of this should likely be configurable (e.g. there could be cases
  126. // where you want `.git` to be synced) but this is suitable for now
  127. dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
  128. if err != nil {
  129. return err
  130. }
  131. ignore := watch.NewCompositeMatcher(
  132. dockerIgnores,
  133. watch.EphemeralPathMatcher(),
  134. dotGitIgnore,
  135. )
  136. var paths, pathLogs []string
  137. for _, trigger := range config.Watch {
  138. if checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
  139. logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
  140. continue
  141. }
  142. paths = append(paths, trigger.Path)
  143. pathLogs = append(pathLogs, fmt.Sprintf("Action %s for path %q", trigger.Action, trigger.Path))
  144. }
  145. watcher, err := watch.NewWatcher(paths, ignore)
  146. if err != nil {
  147. return err
  148. }
  149. logrus.Debugf("Watch configuration for service %q:%s\n",
  150. service.Name,
  151. strings.Join(append([]string{""}, pathLogs...), "\n - "),
  152. )
  153. err = watcher.Start()
  154. if err != nil {
  155. return err
  156. }
  157. watching = true
  158. eg.Go(func() error {
  159. defer func() {
  160. if err := watcher.Close(); err != nil {
  161. logrus.Debugf("Error closing watcher for service %s: %v", service.Name, err)
  162. }
  163. }()
  164. return s.watchEvents(ctx, project, service.Name, options, watcher, syncer, config.Watch)
  165. })
  166. }
  167. if !watching {
  168. return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section")
  169. }
  170. options.LogTo.Log(api.WatchLogger, "Watch enabled")
  171. for {
  172. select {
  173. case <-ctx.Done():
  174. return eg.Wait()
  175. case <-syncChannel:
  176. options.LogTo.Log(api.WatchLogger, "Watch disabled")
  177. return nil
  178. }
  179. }
  180. }
  181. func (s *composeService) watchEvents(ctx context.Context, project *types.Project, name string, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, triggers []types.Trigger) error {
  182. ctx, cancel := context.WithCancel(ctx)
  183. defer cancel()
  184. ignores := make([]watch.PathMatcher, len(triggers))
  185. for i, trigger := range triggers {
  186. ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
  187. if err != nil {
  188. return err
  189. }
  190. ignores[i] = ignore
  191. }
  192. events := make(chan fileEvent)
  193. batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
  194. quit := make(chan bool)
  195. go func() {
  196. for {
  197. select {
  198. case <-ctx.Done():
  199. quit <- true
  200. return
  201. case batch := <-batchEvents:
  202. start := time.Now()
  203. logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
  204. if err := s.handleWatchBatch(ctx, project, name, options, batch, syncer); err != nil {
  205. logrus.Warnf("Error handling changed files for service %s: %v", name, err)
  206. }
  207. logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
  208. name, time.Since(start), len(batch))
  209. }
  210. }
  211. }()
  212. for {
  213. select {
  214. case <-quit:
  215. options.LogTo.Log(api.WatchLogger, "Watch disabled")
  216. return nil
  217. case err := <-watcher.Errors():
  218. options.LogTo.Err(api.WatchLogger, "Watch disabled with errors")
  219. return err
  220. case event := <-watcher.Events():
  221. hostPath := event.Path()
  222. for i, trigger := range triggers {
  223. logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
  224. if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil {
  225. events <- *fileEvent
  226. }
  227. }
  228. }
  229. }
  230. }
  231. // maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
  232. // rules.
  233. //
  234. // Any errors are logged as warnings and nil (no file event) is returned.
  235. func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
  236. if !pathutil.IsChild(trigger.Path, hostPath) {
  237. return nil
  238. }
  239. isIgnored, err := ignore.Matches(hostPath)
  240. if err != nil {
  241. logrus.Warnf("error ignore matching %q: %v", hostPath, err)
  242. return nil
  243. }
  244. if isIgnored {
  245. logrus.Debugf("%s is matching ignore pattern", hostPath)
  246. return nil
  247. }
  248. var containerPath string
  249. if trigger.Target != "" {
  250. rel, err := filepath.Rel(trigger.Path, hostPath)
  251. if err != nil {
  252. logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
  253. return nil
  254. }
  255. // always use Unix-style paths for inside the container
  256. containerPath = path.Join(trigger.Target, rel)
  257. }
  258. return &fileEvent{
  259. Action: trigger.Action,
  260. PathMapping: sync.PathMapping{
  261. HostPath: hostPath,
  262. ContainerPath: containerPath,
  263. },
  264. }
  265. }
  266. func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) {
  267. var config types.DevelopConfig
  268. y, ok := service.Extensions["x-develop"]
  269. if !ok {
  270. return nil, nil
  271. }
  272. logrus.Warnf("x-develop is DEPRECATED, please use the official `develop` attribute")
  273. err := mapstructure.Decode(y, &config)
  274. if err != nil {
  275. return nil, err
  276. }
  277. baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
  278. if err != nil {
  279. return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
  280. }
  281. for i, trigger := range config.Watch {
  282. if !filepath.IsAbs(trigger.Path) {
  283. trigger.Path = filepath.Join(baseDir, trigger.Path)
  284. }
  285. if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
  286. // this might fail because the path doesn't exist, etc.
  287. trigger.Path = p
  288. }
  289. trigger.Path = filepath.Clean(trigger.Path)
  290. if trigger.Path == "" {
  291. return nil, errors.New("watch rules MUST define a path")
  292. }
  293. if trigger.Action == types.WatchActionRebuild && service.Build == nil {
  294. return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
  295. }
  296. config.Watch[i] = trigger
  297. }
  298. return &config, nil
  299. }
  300. // batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
  301. // channel.
  302. //
  303. // The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
  304. func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
  305. out := make(chan []fileEvent)
  306. go func() {
  307. defer close(out)
  308. seen := make(map[fileEvent]time.Time)
  309. flushEvents := func() {
  310. if len(seen) == 0 {
  311. return
  312. }
  313. events := make([]fileEvent, 0, len(seen))
  314. for e := range seen {
  315. events = append(events, e)
  316. }
  317. // sort batch by oldest -> newest
  318. // (if an event is seen > 1 per batch, it gets the latest timestamp)
  319. sort.SliceStable(events, func(i, j int) bool {
  320. x := events[i]
  321. y := events[j]
  322. return seen[x].Before(seen[y])
  323. })
  324. out <- events
  325. seen = make(map[fileEvent]time.Time)
  326. }
  327. t := clock.NewTicker(delay)
  328. defer t.Stop()
  329. for {
  330. select {
  331. case <-ctx.Done():
  332. return
  333. case <-t.Chan():
  334. flushEvents()
  335. case e, ok := <-input:
  336. if !ok {
  337. // input channel was closed
  338. flushEvents()
  339. return
  340. }
  341. seen[e] = time.Now()
  342. t.Reset(delay)
  343. }
  344. }
  345. }()
  346. return out
  347. }
  348. func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
  349. for _, volume := range volumes {
  350. if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
  351. return true
  352. }
  353. }
  354. return false
  355. }
  356. type tarDockerClient struct {
  357. s *composeService
  358. }
  359. func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
  360. containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
  361. if err != nil {
  362. return nil, err
  363. }
  364. return containers, nil
  365. }
  366. func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
  367. execCfg := container.ExecOptions{
  368. Cmd: cmd,
  369. AttachStdout: false,
  370. AttachStderr: true,
  371. AttachStdin: in != nil,
  372. Tty: false,
  373. }
  374. execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
  375. if err != nil {
  376. return err
  377. }
  378. startCheck := container.ExecStartOptions{Tty: false, Detach: false}
  379. conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
  380. if err != nil {
  381. return err
  382. }
  383. defer conn.Close()
  384. var eg errgroup.Group
  385. if in != nil {
  386. eg.Go(func() error {
  387. defer func() {
  388. _ = conn.CloseWrite()
  389. }()
  390. _, err := io.Copy(conn.Conn, in)
  391. return err
  392. })
  393. }
  394. eg.Go(func() error {
  395. _, err := io.Copy(t.s.stdinfo(), conn.Reader)
  396. return err
  397. })
  398. err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
  399. if err != nil {
  400. return err
  401. }
  402. // although the errgroup is not tied directly to the context, the operations
  403. // in it are reading/writing to the connection, which is tied to the context,
  404. // so they won't block indefinitely
  405. if err := eg.Wait(); err != nil {
  406. return err
  407. }
  408. execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
  409. if err != nil {
  410. return err
  411. }
  412. if execResult.Running {
  413. return errors.New("process still running")
  414. }
  415. if execResult.ExitCode != 0 {
  416. return fmt.Errorf("exit code %d", execResult.ExitCode)
  417. }
  418. return nil
  419. }
  420. func (t tarDockerClient) Untar(ctx context.Context, id string, archive io.ReadCloser) error {
  421. return t.s.apiClient().CopyToContainer(ctx, id, "/", archive, container.CopyToContainerOptions{
  422. CopyUIDGID: true,
  423. })
  424. }
  425. func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, serviceName string, options api.WatchOptions, batch []fileEvent, syncer sync.Syncer) error {
  426. pathMappings := make([]sync.PathMapping, len(batch))
  427. restartService := false
  428. for i := range batch {
  429. if batch[i].Action == types.WatchActionRebuild {
  430. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Rebuilding service %q after changes were detected...", serviceName))
  431. // restrict the build to ONLY this service, not any of its dependencies
  432. options.Build.Services = []string{serviceName}
  433. imageNameToIdMap, err := s.build(ctx, project, *options.Build, nil)
  434. if err != nil {
  435. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Build failed. Error: %v", err))
  436. return err
  437. }
  438. if options.Prune {
  439. s.pruneDanglingImagesOnRebuild(ctx, project.Name, imageNameToIdMap)
  440. }
  441. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("service %q successfully built", serviceName))
  442. err = s.create(ctx, project, api.CreateOptions{
  443. Services: []string{serviceName},
  444. Inherit: true,
  445. Recreate: api.RecreateForce,
  446. })
  447. if err != nil {
  448. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Failed to recreate service after update. Error: %v", err))
  449. return err
  450. }
  451. err = s.start(ctx, project.Name, api.StartOptions{
  452. Project: project,
  453. Services: []string{serviceName},
  454. }, nil)
  455. if err != nil {
  456. options.LogTo.Log(api.WatchLogger, fmt.Sprintf("Application failed to start after update. Error: %v", err))
  457. }
  458. return nil
  459. }
  460. if batch[i].Action == types.WatchActionSyncRestart {
  461. restartService = true
  462. }
  463. pathMappings[i] = batch[i].PathMapping
  464. }
  465. writeWatchSyncMessage(options.LogTo, serviceName, pathMappings)
  466. service, err := project.GetService(serviceName)
  467. if err != nil {
  468. return err
  469. }
  470. if err := syncer.Sync(ctx, service, pathMappings); err != nil {
  471. return err
  472. }
  473. if restartService {
  474. return s.Restart(ctx, project.Name, api.RestartOptions{
  475. Services: []string{serviceName},
  476. Project: project,
  477. NoDeps: false,
  478. })
  479. }
  480. return nil
  481. }
  482. // writeWatchSyncMessage prints out a message about the sync for the changed paths.
  483. func writeWatchSyncMessage(log api.LogConsumer, serviceName string, pathMappings []sync.PathMapping) {
  484. const maxPathsToShow = 10
  485. if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) {
  486. hostPathsToSync := make([]string, len(pathMappings))
  487. for i := range pathMappings {
  488. hostPathsToSync[i] = pathMappings[i].HostPath
  489. }
  490. log.Log(api.WatchLogger, fmt.Sprintf("Syncing %q after changes were detected", serviceName))
  491. } else {
  492. hostPathsToSync := make([]string, len(pathMappings))
  493. for i := range pathMappings {
  494. hostPathsToSync[i] = pathMappings[i].HostPath
  495. }
  496. log.Log(api.WatchLogger, fmt.Sprintf("Syncing service %q after %d changes were detected", serviceName, len(pathMappings)))
  497. }
  498. }
  499. func (s *composeService) pruneDanglingImagesOnRebuild(ctx context.Context, projectName string, imageNameToIdMap map[string]string) {
  500. images, err := s.apiClient().ImageList(ctx, image.ListOptions{
  501. Filters: filters.NewArgs(
  502. filters.Arg("dangling", "true"),
  503. filters.Arg("label", api.ProjectLabel+"="+projectName),
  504. ),
  505. })
  506. if err != nil {
  507. logrus.Debugf("Failed to list images: %v", err)
  508. return
  509. }
  510. for _, img := range images {
  511. if _, ok := imageNameToIdMap[img.ID]; !ok {
  512. _, err := s.apiClient().ImageRemove(ctx, img.ID, image.RemoveOptions{})
  513. if err != nil {
  514. logrus.Debugf("Failed to remove image %s: %v", img.ID, err)
  515. }
  516. }
  517. }
  518. }