watch.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/compose-spec/compose-go/v2/types"
  27. "github.com/docker/compose/v2/internal/sync"
  28. "github.com/docker/compose/v2/pkg/api"
  29. "github.com/docker/compose/v2/pkg/watch"
  30. moby "github.com/docker/docker/api/types"
  31. "github.com/jonboulle/clockwork"
  32. "github.com/mitchellh/mapstructure"
  33. "github.com/sirupsen/logrus"
  34. "golang.org/x/sync/errgroup"
  35. )
  36. const quietPeriod = 500 * time.Millisecond
  37. // fileEvent contains the Compose service and modified host system path.
  38. type fileEvent struct {
  39. sync.PathMapping
  40. Action types.WatchAction
  41. }
  42. // getSyncImplementation returns the the tar-based syncer unless it has been explicitly
  43. // disabled with `COMPOSE_EXPERIMENTAL_WATCH_TAR=0`. Note that the absence of the env
  44. // var means enabled.
  45. func (s *composeService) getSyncImplementation(project *types.Project) sync.Syncer {
  46. var useTar bool
  47. if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
  48. useTar, _ = strconv.ParseBool(useTarEnv)
  49. } else {
  50. useTar = true
  51. }
  52. if useTar {
  53. return sync.NewTar(project.Name, tarDockerClient{s: s})
  54. }
  55. return sync.NewDockerCopy(project.Name, s, s.stdinfo())
  56. }
  57. func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
  58. if err := project.ForServices(services); err != nil {
  59. return err
  60. }
  61. syncer := s.getSyncImplementation(project)
  62. eg, ctx := errgroup.WithContext(ctx)
  63. watching := false
  64. for i := range project.Services {
  65. service := project.Services[i]
  66. config, err := loadDevelopmentConfig(service, project)
  67. if err != nil {
  68. return err
  69. }
  70. if service.Develop != nil {
  71. config = service.Develop
  72. }
  73. if config == nil {
  74. continue
  75. }
  76. if len(config.Watch) > 0 && service.Build == nil {
  77. // service configured with watchers but no build section
  78. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  79. }
  80. if len(services) > 0 && service.Build == nil {
  81. // service explicitly selected for watch has no build section
  82. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  83. }
  84. if len(services) == 0 && service.Build == nil {
  85. continue
  86. }
  87. // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
  88. service.PullPolicy = types.PullPolicyBuild
  89. project.Services[i] = service
  90. dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
  91. if err != nil {
  92. return err
  93. }
  94. // add a hardcoded set of ignores on top of what came from .dockerignore
  95. // some of this should likely be configurable (e.g. there could be cases
  96. // where you want `.git` to be synced) but this is suitable for now
  97. dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
  98. if err != nil {
  99. return err
  100. }
  101. ignore := watch.NewCompositeMatcher(
  102. dockerIgnores,
  103. watch.EphemeralPathMatcher(),
  104. dotGitIgnore,
  105. )
  106. var paths, pathLogs []string
  107. for _, trigger := range config.Watch {
  108. if checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
  109. logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
  110. continue
  111. }
  112. paths = append(paths, trigger.Path)
  113. pathLogs = append(pathLogs, fmt.Sprintf("Action %s for path %q", trigger.Action, trigger.Path))
  114. }
  115. watcher, err := watch.NewWatcher(paths, ignore)
  116. if err != nil {
  117. return err
  118. }
  119. fmt.Fprintf(
  120. s.stdinfo(),
  121. "Watch configuration for service %q:%s\n",
  122. service.Name,
  123. strings.Join(append([]string{""}, pathLogs...), "\n - "),
  124. )
  125. err = watcher.Start()
  126. if err != nil {
  127. return err
  128. }
  129. watching = true
  130. eg.Go(func() error {
  131. defer watcher.Close() //nolint:errcheck
  132. return s.watch(ctx, project, service.Name, options, watcher, syncer, config.Watch)
  133. })
  134. }
  135. if !watching {
  136. return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section")
  137. }
  138. return eg.Wait()
  139. }
  140. func (s *composeService) watch(ctx context.Context, project *types.Project, name string, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, triggers []types.Trigger) error {
  141. ctx, cancel := context.WithCancel(ctx)
  142. defer cancel()
  143. ignores := make([]watch.PathMatcher, len(triggers))
  144. for i, trigger := range triggers {
  145. ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
  146. if err != nil {
  147. return err
  148. }
  149. ignores[i] = ignore
  150. }
  151. events := make(chan fileEvent)
  152. batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
  153. go func() {
  154. for {
  155. select {
  156. case <-ctx.Done():
  157. return
  158. case batch := <-batchEvents:
  159. start := time.Now()
  160. logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
  161. if err := s.handleWatchBatch(ctx, project, name, options.Build, batch, syncer); err != nil {
  162. logrus.Warnf("Error handling changed files for service %s: %v", name, err)
  163. }
  164. logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
  165. name, time.Since(start), len(batch))
  166. }
  167. }
  168. }()
  169. for {
  170. select {
  171. case <-ctx.Done():
  172. return nil
  173. case err := <-watcher.Errors():
  174. return err
  175. case event := <-watcher.Events():
  176. hostPath := event.Path()
  177. for i, trigger := range triggers {
  178. logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
  179. if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil {
  180. events <- *fileEvent
  181. }
  182. }
  183. }
  184. }
  185. }
  186. // maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
  187. // rules.
  188. //
  189. // Any errors are logged as warnings and nil (no file event) is returned.
  190. func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
  191. if !watch.IsChild(trigger.Path, hostPath) {
  192. return nil
  193. }
  194. isIgnored, err := ignore.Matches(hostPath)
  195. if err != nil {
  196. logrus.Warnf("error ignore matching %q: %v", hostPath, err)
  197. return nil
  198. }
  199. if isIgnored {
  200. logrus.Debugf("%s is matching ignore pattern", hostPath)
  201. return nil
  202. }
  203. var containerPath string
  204. if trigger.Target != "" {
  205. rel, err := filepath.Rel(trigger.Path, hostPath)
  206. if err != nil {
  207. logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
  208. return nil
  209. }
  210. // always use Unix-style paths for inside the container
  211. containerPath = path.Join(trigger.Target, rel)
  212. }
  213. return &fileEvent{
  214. Action: trigger.Action,
  215. PathMapping: sync.PathMapping{
  216. HostPath: hostPath,
  217. ContainerPath: containerPath,
  218. },
  219. }
  220. }
  221. func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) {
  222. var config types.DevelopConfig
  223. y, ok := service.Extensions["x-develop"]
  224. if !ok {
  225. return nil, nil
  226. }
  227. logrus.Warnf("x-develop is DEPRECATED, please use the official `develop` attribute")
  228. err := mapstructure.Decode(y, &config)
  229. if err != nil {
  230. return nil, err
  231. }
  232. baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
  233. if err != nil {
  234. return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
  235. }
  236. for i, trigger := range config.Watch {
  237. if !filepath.IsAbs(trigger.Path) {
  238. trigger.Path = filepath.Join(baseDir, trigger.Path)
  239. }
  240. if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
  241. // this might fail because the path doesn't exist, etc.
  242. trigger.Path = p
  243. }
  244. trigger.Path = filepath.Clean(trigger.Path)
  245. if trigger.Path == "" {
  246. return nil, errors.New("watch rules MUST define a path")
  247. }
  248. if trigger.Action == types.WatchActionRebuild && service.Build == nil {
  249. return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
  250. }
  251. config.Watch[i] = trigger
  252. }
  253. return &config, nil
  254. }
  255. // batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
  256. // channel.
  257. //
  258. // The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
  259. func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
  260. out := make(chan []fileEvent)
  261. go func() {
  262. defer close(out)
  263. seen := make(map[fileEvent]time.Time)
  264. flushEvents := func() {
  265. if len(seen) == 0 {
  266. return
  267. }
  268. events := make([]fileEvent, 0, len(seen))
  269. for e := range seen {
  270. events = append(events, e)
  271. }
  272. // sort batch by oldest -> newest
  273. // (if an event is seen > 1 per batch, it gets the latest timestamp)
  274. sort.SliceStable(events, func(i, j int) bool {
  275. x := events[i]
  276. y := events[j]
  277. return seen[x].Before(seen[y])
  278. })
  279. out <- events
  280. seen = make(map[fileEvent]time.Time)
  281. }
  282. t := clock.NewTicker(delay)
  283. defer t.Stop()
  284. for {
  285. select {
  286. case <-ctx.Done():
  287. return
  288. case <-t.Chan():
  289. flushEvents()
  290. case e, ok := <-input:
  291. if !ok {
  292. // input channel was closed
  293. flushEvents()
  294. return
  295. }
  296. seen[e] = time.Now()
  297. t.Reset(delay)
  298. }
  299. }
  300. }()
  301. return out
  302. }
  303. func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
  304. for _, volume := range volumes {
  305. if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
  306. return true
  307. }
  308. }
  309. return false
  310. }
  311. type tarDockerClient struct {
  312. s *composeService
  313. }
  314. func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
  315. containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
  316. if err != nil {
  317. return nil, err
  318. }
  319. return containers, nil
  320. }
  321. func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
  322. execCfg := moby.ExecConfig{
  323. Cmd: cmd,
  324. AttachStdout: false,
  325. AttachStderr: true,
  326. AttachStdin: in != nil,
  327. Tty: false,
  328. }
  329. execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
  330. if err != nil {
  331. return err
  332. }
  333. startCheck := moby.ExecStartCheck{Tty: false, Detach: false}
  334. conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
  335. if err != nil {
  336. return err
  337. }
  338. defer conn.Close()
  339. var eg errgroup.Group
  340. if in != nil {
  341. eg.Go(func() error {
  342. defer func() {
  343. _ = conn.CloseWrite()
  344. }()
  345. _, err := io.Copy(conn.Conn, in)
  346. return err
  347. })
  348. }
  349. eg.Go(func() error {
  350. _, err := io.Copy(t.s.stdinfo(), conn.Reader)
  351. return err
  352. })
  353. err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
  354. if err != nil {
  355. return err
  356. }
  357. // although the errgroup is not tied directly to the context, the operations
  358. // in it are reading/writing to the connection, which is tied to the context,
  359. // so they won't block indefinitely
  360. if err := eg.Wait(); err != nil {
  361. return err
  362. }
  363. execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
  364. if err != nil {
  365. return err
  366. }
  367. if execResult.Running {
  368. return errors.New("process still running")
  369. }
  370. if execResult.ExitCode != 0 {
  371. return fmt.Errorf("exit code %d", execResult.ExitCode)
  372. }
  373. return nil
  374. }
  375. func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, serviceName string, build api.BuildOptions, batch []fileEvent, syncer sync.Syncer) error {
  376. pathMappings := make([]sync.PathMapping, len(batch))
  377. restartService := false
  378. for i := range batch {
  379. if batch[i].Action == types.WatchActionRebuild {
  380. fmt.Fprintf(
  381. s.stdinfo(),
  382. "Rebuilding service %q after changes were detected:%s\n",
  383. serviceName,
  384. strings.Join(append([]string{""}, batch[i].HostPath), "\n - "),
  385. )
  386. // restrict the build to ONLY this service, not any of its dependencies
  387. build.Services = []string{serviceName}
  388. err := s.Up(ctx, project, api.UpOptions{
  389. Create: api.CreateOptions{
  390. Build: &build,
  391. Services: []string{serviceName},
  392. Inherit: true,
  393. },
  394. Start: api.StartOptions{
  395. Services: []string{serviceName},
  396. Project: project,
  397. },
  398. })
  399. if err != nil {
  400. fmt.Fprintf(s.stderr(), "Application failed to start after update. Error: %v\n", err)
  401. }
  402. return nil
  403. }
  404. if batch[i].Action == types.WatchActionSyncRestart {
  405. restartService = true
  406. }
  407. pathMappings[i] = batch[i].PathMapping
  408. }
  409. writeWatchSyncMessage(s.stdinfo(), serviceName, pathMappings)
  410. service, err := project.GetService(serviceName)
  411. if err != nil {
  412. return err
  413. }
  414. if err := syncer.Sync(ctx, service, pathMappings); err != nil {
  415. return err
  416. }
  417. if restartService {
  418. return s.Restart(ctx, project.Name, api.RestartOptions{
  419. Services: []string{serviceName},
  420. Project: project,
  421. NoDeps: false,
  422. })
  423. }
  424. return nil
  425. }
  426. // writeWatchSyncMessage prints out a message about the sync for the changed paths.
  427. func writeWatchSyncMessage(w io.Writer, serviceName string, pathMappings []sync.PathMapping) {
  428. const maxPathsToShow = 10
  429. if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) {
  430. hostPathsToSync := make([]string, len(pathMappings))
  431. for i := range pathMappings {
  432. hostPathsToSync[i] = pathMappings[i].HostPath
  433. }
  434. fmt.Fprintf(
  435. w,
  436. "Syncing %q after changes were detected:%s\n",
  437. serviceName,
  438. strings.Join(append([]string{""}, hostPathsToSync...), "\n - "),
  439. )
  440. } else {
  441. hostPathsToSync := make([]string, len(pathMappings))
  442. for i := range pathMappings {
  443. hostPathsToSync[i] = pathMappings[i].HostPath
  444. }
  445. fmt.Fprintf(
  446. w,
  447. "Syncing service %q after %d changes were detected\n",
  448. serviceName,
  449. len(pathMappings),
  450. )
  451. }
  452. }