watch.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "io"
  19. "os"
  20. "path"
  21. "path/filepath"
  22. "sort"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/compose-spec/compose-go/v2/types"
  27. "github.com/docker/compose/v2/internal/sync"
  28. "github.com/docker/compose/v2/pkg/api"
  29. "github.com/docker/compose/v2/pkg/watch"
  30. moby "github.com/docker/docker/api/types"
  31. "github.com/jonboulle/clockwork"
  32. "github.com/mitchellh/mapstructure"
  33. "github.com/sirupsen/logrus"
  34. "golang.org/x/sync/errgroup"
  35. )
  36. const quietPeriod = 500 * time.Millisecond
  37. // fileEvent contains the Compose service and modified host system path.
  38. type fileEvent struct {
  39. sync.PathMapping
  40. Action types.WatchAction
  41. }
  42. // getSyncImplementation returns an appropriate sync implementation for the
  43. // project.
  44. //
  45. // Currently, an implementation that batches files and transfers them using
  46. // the Moby `Untar` API.
  47. func (s *composeService) getSyncImplementation(project *types.Project) (sync.Syncer, error) {
  48. var useTar bool
  49. if useTarEnv, ok := os.LookupEnv("COMPOSE_EXPERIMENTAL_WATCH_TAR"); ok {
  50. useTar, _ = strconv.ParseBool(useTarEnv)
  51. } else {
  52. useTar = true
  53. }
  54. if !useTar {
  55. return nil, errors.New("no available sync implementation")
  56. }
  57. return sync.NewTar(project.Name, tarDockerClient{s: s}), nil
  58. }
  59. func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, options api.WatchOptions) error { //nolint: gocyclo
  60. var err error
  61. if project, err = project.WithSelectedServices(services); err != nil {
  62. return err
  63. }
  64. syncer, err := s.getSyncImplementation(project)
  65. if err != nil {
  66. return err
  67. }
  68. eg, ctx := errgroup.WithContext(ctx)
  69. watching := false
  70. for i := range project.Services {
  71. service := project.Services[i]
  72. config, err := loadDevelopmentConfig(service, project)
  73. if err != nil {
  74. return err
  75. }
  76. if service.Develop != nil {
  77. config = service.Develop
  78. }
  79. if config == nil {
  80. continue
  81. }
  82. if len(config.Watch) > 0 && service.Build == nil {
  83. // service configured with watchers but no build section
  84. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  85. }
  86. if len(services) > 0 && service.Build == nil {
  87. // service explicitly selected for watch has no build section
  88. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  89. }
  90. if len(services) == 0 && service.Build == nil {
  91. continue
  92. }
  93. // set the service to always be built - watch triggers `Up()` when it receives a rebuild event
  94. service.PullPolicy = types.PullPolicyBuild
  95. project.Services[i] = service
  96. dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
  97. if err != nil {
  98. return err
  99. }
  100. // add a hardcoded set of ignores on top of what came from .dockerignore
  101. // some of this should likely be configurable (e.g. there could be cases
  102. // where you want `.git` to be synced) but this is suitable for now
  103. dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
  104. if err != nil {
  105. return err
  106. }
  107. ignore := watch.NewCompositeMatcher(
  108. dockerIgnores,
  109. watch.EphemeralPathMatcher(),
  110. dotGitIgnore,
  111. )
  112. var paths, pathLogs []string
  113. for _, trigger := range config.Watch {
  114. if checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
  115. logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
  116. continue
  117. }
  118. paths = append(paths, trigger.Path)
  119. pathLogs = append(pathLogs, fmt.Sprintf("Action %s for path %q", trigger.Action, trigger.Path))
  120. }
  121. watcher, err := watch.NewWatcher(paths, ignore)
  122. if err != nil {
  123. return err
  124. }
  125. fmt.Fprintf(
  126. s.stdinfo(),
  127. "Watch configuration for service %q:%s\n",
  128. service.Name,
  129. strings.Join(append([]string{""}, pathLogs...), "\n - "),
  130. )
  131. err = watcher.Start()
  132. if err != nil {
  133. return err
  134. }
  135. watching = true
  136. eg.Go(func() error {
  137. defer watcher.Close() //nolint:errcheck
  138. return s.watch(ctx, project, service.Name, options, watcher, syncer, config.Watch)
  139. })
  140. }
  141. if !watching {
  142. return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'develop' section")
  143. }
  144. return eg.Wait()
  145. }
  146. func (s *composeService) watch(ctx context.Context, project *types.Project, name string, options api.WatchOptions, watcher watch.Notify, syncer sync.Syncer, triggers []types.Trigger) error {
  147. ctx, cancel := context.WithCancel(ctx)
  148. defer cancel()
  149. ignores := make([]watch.PathMatcher, len(triggers))
  150. for i, trigger := range triggers {
  151. ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
  152. if err != nil {
  153. return err
  154. }
  155. ignores[i] = ignore
  156. }
  157. events := make(chan fileEvent)
  158. batchEvents := batchDebounceEvents(ctx, s.clock, quietPeriod, events)
  159. go func() {
  160. for {
  161. select {
  162. case <-ctx.Done():
  163. return
  164. case batch := <-batchEvents:
  165. start := time.Now()
  166. logrus.Debugf("batch start: service[%s] count[%d]", name, len(batch))
  167. if err := s.handleWatchBatch(ctx, project, name, options.Build, batch, syncer); err != nil {
  168. logrus.Warnf("Error handling changed files for service %s: %v", name, err)
  169. }
  170. logrus.Debugf("batch complete: service[%s] duration[%s] count[%d]",
  171. name, time.Since(start), len(batch))
  172. }
  173. }
  174. }()
  175. for {
  176. select {
  177. case <-ctx.Done():
  178. return nil
  179. case err := <-watcher.Errors():
  180. return err
  181. case event := <-watcher.Events():
  182. hostPath := event.Path()
  183. for i, trigger := range triggers {
  184. logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
  185. if fileEvent := maybeFileEvent(trigger, hostPath, ignores[i]); fileEvent != nil {
  186. events <- *fileEvent
  187. }
  188. }
  189. }
  190. }
  191. }
  192. // maybeFileEvent returns a file event object if hostPath is valid for the provided trigger and ignore
  193. // rules.
  194. //
  195. // Any errors are logged as warnings and nil (no file event) is returned.
  196. func maybeFileEvent(trigger types.Trigger, hostPath string, ignore watch.PathMatcher) *fileEvent {
  197. if !watch.IsChild(trigger.Path, hostPath) {
  198. return nil
  199. }
  200. isIgnored, err := ignore.Matches(hostPath)
  201. if err != nil {
  202. logrus.Warnf("error ignore matching %q: %v", hostPath, err)
  203. return nil
  204. }
  205. if isIgnored {
  206. logrus.Debugf("%s is matching ignore pattern", hostPath)
  207. return nil
  208. }
  209. var containerPath string
  210. if trigger.Target != "" {
  211. rel, err := filepath.Rel(trigger.Path, hostPath)
  212. if err != nil {
  213. logrus.Warnf("error making %s relative to %s: %v", hostPath, trigger.Path, err)
  214. return nil
  215. }
  216. // always use Unix-style paths for inside the container
  217. containerPath = path.Join(trigger.Target, rel)
  218. }
  219. return &fileEvent{
  220. Action: trigger.Action,
  221. PathMapping: sync.PathMapping{
  222. HostPath: hostPath,
  223. ContainerPath: containerPath,
  224. },
  225. }
  226. }
  227. func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*types.DevelopConfig, error) {
  228. var config types.DevelopConfig
  229. y, ok := service.Extensions["x-develop"]
  230. if !ok {
  231. return nil, nil
  232. }
  233. logrus.Warnf("x-develop is DEPRECATED, please use the official `develop` attribute")
  234. err := mapstructure.Decode(y, &config)
  235. if err != nil {
  236. return nil, err
  237. }
  238. baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
  239. if err != nil {
  240. return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
  241. }
  242. for i, trigger := range config.Watch {
  243. if !filepath.IsAbs(trigger.Path) {
  244. trigger.Path = filepath.Join(baseDir, trigger.Path)
  245. }
  246. if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
  247. // this might fail because the path doesn't exist, etc.
  248. trigger.Path = p
  249. }
  250. trigger.Path = filepath.Clean(trigger.Path)
  251. if trigger.Path == "" {
  252. return nil, errors.New("watch rules MUST define a path")
  253. }
  254. if trigger.Action == types.WatchActionRebuild && service.Build == nil {
  255. return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
  256. }
  257. config.Watch[i] = trigger
  258. }
  259. return &config, nil
  260. }
  261. // batchDebounceEvents groups identical file events within a sliding time window and writes the results to the returned
  262. // channel.
  263. //
  264. // The returned channel is closed when the debouncer is stopped via context cancellation or by closing the input channel.
  265. func batchDebounceEvents(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent) <-chan []fileEvent {
  266. out := make(chan []fileEvent)
  267. go func() {
  268. defer close(out)
  269. seen := make(map[fileEvent]time.Time)
  270. flushEvents := func() {
  271. if len(seen) == 0 {
  272. return
  273. }
  274. events := make([]fileEvent, 0, len(seen))
  275. for e := range seen {
  276. events = append(events, e)
  277. }
  278. // sort batch by oldest -> newest
  279. // (if an event is seen > 1 per batch, it gets the latest timestamp)
  280. sort.SliceStable(events, func(i, j int) bool {
  281. x := events[i]
  282. y := events[j]
  283. return seen[x].Before(seen[y])
  284. })
  285. out <- events
  286. seen = make(map[fileEvent]time.Time)
  287. }
  288. t := clock.NewTicker(delay)
  289. defer t.Stop()
  290. for {
  291. select {
  292. case <-ctx.Done():
  293. return
  294. case <-t.Chan():
  295. flushEvents()
  296. case e, ok := <-input:
  297. if !ok {
  298. // input channel was closed
  299. flushEvents()
  300. return
  301. }
  302. seen[e] = time.Now()
  303. t.Reset(delay)
  304. }
  305. }
  306. }()
  307. return out
  308. }
  309. func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
  310. for _, volume := range volumes {
  311. if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
  312. return true
  313. }
  314. }
  315. return false
  316. }
  317. type tarDockerClient struct {
  318. s *composeService
  319. }
  320. func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
  321. containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
  322. if err != nil {
  323. return nil, err
  324. }
  325. return containers, nil
  326. }
  327. func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
  328. execCfg := moby.ExecConfig{
  329. Cmd: cmd,
  330. AttachStdout: false,
  331. AttachStderr: true,
  332. AttachStdin: in != nil,
  333. Tty: false,
  334. }
  335. execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
  336. if err != nil {
  337. return err
  338. }
  339. startCheck := moby.ExecStartCheck{Tty: false, Detach: false}
  340. conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
  341. if err != nil {
  342. return err
  343. }
  344. defer conn.Close()
  345. var eg errgroup.Group
  346. if in != nil {
  347. eg.Go(func() error {
  348. defer func() {
  349. _ = conn.CloseWrite()
  350. }()
  351. _, err := io.Copy(conn.Conn, in)
  352. return err
  353. })
  354. }
  355. eg.Go(func() error {
  356. _, err := io.Copy(t.s.stdinfo(), conn.Reader)
  357. return err
  358. })
  359. err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
  360. if err != nil {
  361. return err
  362. }
  363. // although the errgroup is not tied directly to the context, the operations
  364. // in it are reading/writing to the connection, which is tied to the context,
  365. // so they won't block indefinitely
  366. if err := eg.Wait(); err != nil {
  367. return err
  368. }
  369. execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
  370. if err != nil {
  371. return err
  372. }
  373. if execResult.Running {
  374. return errors.New("process still running")
  375. }
  376. if execResult.ExitCode != 0 {
  377. return fmt.Errorf("exit code %d", execResult.ExitCode)
  378. }
  379. return nil
  380. }
  381. func (t tarDockerClient) Untar(ctx context.Context, id string, archive io.ReadCloser) error {
  382. return t.s.apiClient().CopyToContainer(ctx, id, "/", archive, moby.CopyToContainerOptions{
  383. CopyUIDGID: true,
  384. })
  385. }
  386. func (s *composeService) handleWatchBatch(ctx context.Context, project *types.Project, serviceName string, build api.BuildOptions, batch []fileEvent, syncer sync.Syncer) error {
  387. pathMappings := make([]sync.PathMapping, len(batch))
  388. restartService := false
  389. for i := range batch {
  390. if batch[i].Action == types.WatchActionRebuild {
  391. fmt.Fprintf(
  392. s.stdinfo(),
  393. "Rebuilding service %q after changes were detected:%s\n",
  394. serviceName,
  395. strings.Join(append([]string{""}, batch[i].HostPath), "\n - "),
  396. )
  397. // restrict the build to ONLY this service, not any of its dependencies
  398. build.Services = []string{serviceName}
  399. err := s.Up(ctx, project, api.UpOptions{
  400. Create: api.CreateOptions{
  401. Build: &build,
  402. Services: []string{serviceName},
  403. Inherit: true,
  404. },
  405. Start: api.StartOptions{
  406. Services: []string{serviceName},
  407. Project: project,
  408. },
  409. })
  410. if err != nil {
  411. fmt.Fprintf(s.stderr(), "Application failed to start after update. Error: %v\n", err)
  412. }
  413. return nil
  414. }
  415. if batch[i].Action == types.WatchActionSyncRestart {
  416. restartService = true
  417. }
  418. pathMappings[i] = batch[i].PathMapping
  419. }
  420. writeWatchSyncMessage(s.stdinfo(), serviceName, pathMappings)
  421. service, err := project.GetService(serviceName)
  422. if err != nil {
  423. return err
  424. }
  425. if err := syncer.Sync(ctx, service, pathMappings); err != nil {
  426. return err
  427. }
  428. if restartService {
  429. return s.Restart(ctx, project.Name, api.RestartOptions{
  430. Services: []string{serviceName},
  431. Project: project,
  432. NoDeps: false,
  433. })
  434. }
  435. return nil
  436. }
  437. // writeWatchSyncMessage prints out a message about the sync for the changed paths.
  438. func writeWatchSyncMessage(w io.Writer, serviceName string, pathMappings []sync.PathMapping) {
  439. const maxPathsToShow = 10
  440. if len(pathMappings) <= maxPathsToShow || logrus.IsLevelEnabled(logrus.DebugLevel) {
  441. hostPathsToSync := make([]string, len(pathMappings))
  442. for i := range pathMappings {
  443. hostPathsToSync[i] = pathMappings[i].HostPath
  444. }
  445. fmt.Fprintf(
  446. w,
  447. "Syncing %q after changes were detected:%s\n",
  448. serviceName,
  449. strings.Join(append([]string{""}, hostPathsToSync...), "\n - "),
  450. )
  451. } else {
  452. hostPathsToSync := make([]string, len(pathMappings))
  453. for i := range pathMappings {
  454. hostPathsToSync[i] = pathMappings[i].HostPath
  455. }
  456. fmt.Fprintf(
  457. w,
  458. "Syncing service %q after %d changes were detected\n",
  459. serviceName,
  460. len(pathMappings),
  461. )
  462. }
  463. }