watch.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "os"
  19. "path"
  20. "path/filepath"
  21. "sort"
  22. "strconv"
  23. "strings"
  24. "time"
  25. moby "github.com/docker/docker/api/types"
  26. "github.com/docker/compose/v2/internal/sync"
  27. "github.com/compose-spec/compose-go/types"
  28. "github.com/jonboulle/clockwork"
  29. "github.com/mitchellh/mapstructure"
  30. "github.com/pkg/errors"
  31. "github.com/sirupsen/logrus"
  32. "golang.org/x/sync/errgroup"
  33. "github.com/docker/compose/v2/pkg/api"
  34. "github.com/docker/compose/v2/pkg/watch"
  35. )
  36. type DevelopmentConfig struct {
  37. Watch []Trigger `json:"watch,omitempty"`
  38. }
  39. type WatchAction string
  40. const (
  41. WatchActionSync WatchAction = "sync"
  42. WatchActionRebuild WatchAction = "rebuild"
  43. )
  44. type Trigger struct {
  45. Path string `json:"path,omitempty"`
  46. Action string `json:"action,omitempty"`
  47. Target string `json:"target,omitempty"`
  48. Ignore []string `json:"ignore,omitempty"`
  49. }
  50. const quietPeriod = 500 * time.Millisecond
  51. // fileEvent contains the Compose service and modified host system path.
  52. type fileEvent struct {
  53. sync.PathMapping
  54. Action WatchAction
  55. }
  56. // getSyncImplementation returns the the tar-based syncer unless it has been explicitly
  57. // disabled with `COMPOSE_EXPERIMENTAL_WATCH_TAR=0`. Note that the absence of the env
  58. // var means enabled.
  59. func (s *composeService) getSyncImplementation(project *types.Project) sync.Syncer {
  60. var useTar bool
  61. if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
  62. useTar, _ = strconv.ParseBool(useTarEnv)
  63. } else {
  64. useTar = true
  65. }
  66. if useTar {
  67. return sync.NewTar(project.Name, tarDockerClient{s: s})
  68. }
  69. return sync.NewDockerCopy(project.Name, s, s.stdinfo())
  70. }
  71. func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo
  72. _, err := s.prepareProjectForBuild(project, nil)
  73. if err != nil {
  74. return err
  75. }
  76. if err := project.ForServices(services); err != nil {
  77. return err
  78. }
  79. syncer := s.getSyncImplementation(project)
  80. eg, ctx := errgroup.WithContext(ctx)
  81. watching := false
  82. for i := range project.Services {
  83. service := project.Services[i]
  84. config, err := loadDevelopmentConfig(service, project)
  85. if err != nil {
  86. return err
  87. }
  88. if config == nil {
  89. continue
  90. }
  91. if len(config.Watch) > 0 && service.Build == nil {
  92. // service configured with watchers but no build section
  93. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  94. }
  95. if len(services) > 0 && service.Build == nil {
  96. // service explicitly selected for watch has no build section
  97. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  98. }
  99. if len(services) == 0 && service.Build == nil {
  100. continue
  101. }
  102. // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
  103. service.PullPolicy = types.PullPolicyBuild
  104. project.Services[i] = service
  105. dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
  106. if err != nil {
  107. return err
  108. }
  109. // add a hardcoded set of ignores on top of what came from .dockerignore
  110. // some of this should likely be configurable (e.g. there could be cases
  111. // where you want `.git` to be synced) but this is suitable for now
  112. dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
  113. if err != nil {
  114. return err
  115. }
  116. ignore := watch.NewCompositeMatcher(
  117. dockerIgnores,
  118. watch.EphemeralPathMatcher(),
  119. dotGitIgnore,
  120. )
  121. var paths []string
  122. for _, trigger := range config.Watch {
  123. if checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
  124. logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
  125. continue
  126. }
  127. paths = append(paths, trigger.Path)
  128. }
  129. watcher, err := watch.NewWatcher(paths, ignore)
  130. if err != nil {
  131. return err
  132. }
  133. fmt.Fprintf(s.stdinfo(), "watching %s\n", paths)
  134. err = watcher.Start()
  135. if err != nil {
  136. return err
  137. }
  138. watching = true
  139. eg.Go(func() error {
  140. defer watcher.Close() //nolint:errcheck
  141. return s.watch(ctx, project, service.Name, watcher, syncer, config.Watch)
  142. })
  143. }
  144. if !watching {
  145. return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'x-develop' section")
  146. }
  147. return eg.Wait()
  148. }
  149. func (s *composeService) watch(
  150. ctx context.Context,
  151. project *types.Project,
  152. name string,
  153. watcher watch.Notify,
  154. syncer sync.Syncer,
  155. triggers []Trigger,
  156. ) error {
  157. ctx, cancel := context.WithCancel(ctx)
  158. defer cancel()
  159. ignores := make([]watch.PathMatcher, len(triggers))
  160. for i, trigger := range triggers {
  161. ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
  162. if err != nil {
  163. return err
  164. }
  165. ignores[i] = ignore
  166. }
  167. events := make(chan fileEvent)
  168. batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
  169. go func() {
  170. for {
  171. select {
  172. case <-ctx.Done():
  173. return
  174. case batch := <-batchEvents:
  175. start := time.Now()
  176. logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
  177. if err := s.handleWatchBatch(ctx, project, name, batch, syncer); err != nil {
  178. logrus.Warnf("Error handling changed files for service %s: %v", name, err)
  179. }
  180. logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
  181. name, time.Since(start), len(batch))
  182. }
  183. }
  184. }()
  185. for {
  186. select {
  187. case <-ctx.Done():
  188. return nil
  189. case err := <-watcher.Errors():
  190. return err
  191. case event := <-watcher.Events():
  192. hostPath := event.Path()
  193. for i, trigger := range triggers {
  194. logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
  195. if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil {
  196. events <- *fileEvent
  197. }
  198. }
  199. }
  200. }
  201. }
  202. // maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
  203. // rules.
  204. //
  205. // Any errors are logged as warnings and nil (no file event) is returned.
  206. func maybeFileEvent(trigger Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
  207. if !watch.IsChild(trigger.Path, hostPath) {
  208. return nil
  209. }
  210. isIgnored, err := ignore.Matches(hostPath)
  211. if err != nil {
  212. logrus.Warnf("error ignore matching %q: %v", hostPath, err)
  213. return nil
  214. }
  215. if isIgnored {
  216. logrus.Debugf("%s is matching ignore pattern", hostPath)
  217. return nil
  218. }
  219. var containerPath string
  220. if trigger.Target != "" {
  221. rel, err := filepath.Rel(trigger.Path, hostPath)
  222. if err != nil {
  223. logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
  224. return nil
  225. }
  226. // always use Unix-style paths for inside the container
  227. containerPath = path.Join(trigger.Target, rel)
  228. }
  229. return &fileEvent{
  230. Action: WatchAction(trigger.Action),
  231. PathMapping: sync.PathMapping{
  232. HostPath: hostPath,
  233. ContainerPath: containerPath,
  234. },
  235. }
  236. }
  237. func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*DevelopmentConfig, error) {
  238. var config DevelopmentConfig
  239. y, ok := service.Extensions["x-develop"]
  240. if !ok {
  241. return nil, nil
  242. }
  243. err := mapstructure.Decode(y, &config)
  244. if err != nil {
  245. return nil, err
  246. }
  247. baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
  248. if err != nil {
  249. return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
  250. }
  251. for i, trigger := range config.Watch {
  252. if !filepath.IsAbs(trigger.Path) {
  253. trigger.Path = filepath.Join(baseDir, trigger.Path)
  254. }
  255. if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
  256. // this might fail because the path doesn't exist, etc.
  257. trigger.Path = p
  258. }
  259. trigger.Path = filepath.Clean(trigger.Path)
  260. if trigger.Path == "" {
  261. return nil, errors.New("watch rules MUST define a path")
  262. }
  263. if trigger.Action == string(WatchActionRebuild) && service.Build == nil {
  264. return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
  265. }
  266. config.Watch[i] = trigger
  267. }
  268. return &config, nil
  269. }
  270. // batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
  271. // channel.
  272. //
  273. // The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
  274. func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
  275. out := make(chan []fileEvent)
  276. go func() {
  277. defer close(out)
  278. seen := make(map[fileEvent]time.Time)
  279. flushEvents := func() {
  280. if len(seen) == 0 {
  281. return
  282. }
  283. events := make([]fileEvent, 0, len(seen))
  284. for e := range seen {
  285. events = append(events, e)
  286. }
  287. // sort batch by oldest -> newest
  288. // (if an event is seen > 1 per batch, it gets the latest timestamp)
  289. sort.SliceStable(events, func(i, j int) bool {
  290. x := events[i]
  291. y := events[j]
  292. return seen[x].Before(seen[y])
  293. })
  294. out <- events
  295. seen = make(map[fileEvent]time.Time)
  296. }
  297. t := clock.NewTicker(delay)
  298. defer t.Stop()
  299. for {
  300. select {
  301. case <-ctx.Done():
  302. return
  303. case <-t.Chan():
  304. flushEvents()
  305. case e, ok := <-input:
  306. if !ok {
  307. // input channel was closed
  308. flushEvents()
  309. return
  310. }
  311. seen[e] = time.Now()
  312. t.Reset(delay)
  313. }
  314. }
  315. }()
  316. return out
  317. }
  318. func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
  319. for _, volume := range volumes {
  320. if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
  321. return true
  322. }
  323. }
  324. return false
  325. }
  326. type tarDockerClient struct {
  327. s *composeService
  328. }
  329. func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
  330. containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
  331. if err != nil {
  332. return nil, err
  333. }
  334. return containers, nil
  335. }
  336. func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
  337. execCfg := moby.ExecConfig{
  338. Cmd: cmd,
  339. AttachStdout: false,
  340. AttachStderr: true,
  341. AttachStdin: in != nil,
  342. Tty: false,
  343. }
  344. execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
  345. if err != nil {
  346. return err
  347. }
  348. startCheck := moby.ExecStartCheck{Tty: false, Detach: false}
  349. conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
  350. if err != nil {
  351. return err
  352. }
  353. defer conn.Close()
  354. var eg errgroup.Group
  355. if in != nil {
  356. eg.Go(func() error {
  357. defer func() {
  358. _ = conn.CloseWrite()
  359. }()
  360. _, err := io.Copy(conn.Conn, in)
  361. return err
  362. })
  363. }
  364. eg.Go(func() error {
  365. _, err := io.Copy(t.s.stdinfo(), conn.Reader)
  366. return err
  367. })
  368. err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
  369. if err != nil {
  370. return err
  371. }
  372. // although the errgroup is not tied directly to the context, the operations
  373. // in it are reading/writing to the connection, which is tied to the context,
  374. // so they won't block indefinitely
  375. if err := eg.Wait(); err != nil {
  376. return err
  377. }
  378. execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
  379. if err != nil {
  380. return err
  381. }
  382. if execResult.Running {
  383. return errors.New("process still running")
  384. }
  385. if execResult.ExitCode != 0 {
  386. return fmt.Errorf("exit code %d", execResult.ExitCode)
  387. }
  388. return nil
  389. }
  390. func (s *composeService) handleWatchBatch(
  391. ctx context.Context,
  392. project *types.Project,
  393. serviceName string,
  394. batch []fileEvent,
  395. syncer sync.Syncer,
  396. ) error {
  397. pathMappings := make([]sync.PathMapping, len(batch))
  398. for i := range batch {
  399. if batch[i].Action == WatchActionRebuild {
  400. fmt.Fprintf(
  401. s.stdinfo(),
  402. "Rebuilding %s after changes were detected:%s\n",
  403. serviceName,
  404. strings.Join(append([]string{""}, batch[i].HostPath), "\n - "),
  405. )
  406. err := s.Up(ctx, project, api.UpOptions{
  407. Create: api.CreateOptions{
  408. Services: []string{serviceName},
  409. Inherit: true,
  410. },
  411. Start: api.StartOptions{
  412. Services: []string{serviceName},
  413. Project: project,
  414. },
  415. })
  416. if err != nil {
  417. fmt.Fprintf(s.stderr(), "Application failed to start after update\n")
  418. }
  419. return nil
  420. }
  421. pathMappings[i] = batch[i].PathMapping
  422. }
  423. writeWatchSyncMessage(s.stdinfo(), serviceName, pathMappings)
  424. service, err := project.GetService(serviceName)
  425. if err != nil {
  426. return err
  427. }
  428. if err := syncer.Sync(ctx, service, pathMappings); err != nil {
  429. return err
  430. }
  431. return nil
  432. }
  433. // writeWatchSyncMessage prints out a message about the sync for the changed paths.
  434. func writeWatchSyncMessage(w io.Writer, serviceName string, pathMappings []sync.PathMapping) {
  435. const maxPathsToShow = 10
  436. if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) {
  437. hostPathsToSync := make([]string, len(pathMappings))
  438. for i := range pathMappings {
  439. hostPathsToSync[i] = pathMappings[i].HostPath
  440. }
  441. fmt.Fprintf(
  442. w,
  443. "Syncing %s after changes were detected:%s\n",
  444. serviceName,
  445. strings.Join(append([]string{""}, hostPathsToSync...), "\n - "),
  446. )
  447. } else {
  448. hostPathsToSync := make([]string, len(pathMappings))
  449. for i := range pathMappings {
  450. hostPathsToSync[i] = pathMappings[i].HostPath
  451. }
  452. fmt.Fprintf(
  453. w,
  454. "Syncing %s after %d changes were detected\n",
  455. serviceName,
  456. len(pathMappings),
  457. )
  458. }
  459. }