watch.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "io"
  18. "os"
  19. "path"
  20. "path/filepath"
  21. "strconv"
  22. "strings"
  23. "time"
  24. moby "github.com/docker/docker/api/types"
  25. "github.com/docker/compose/v2/internal/sync"
  26. "github.com/compose-spec/compose-go/types"
  27. "github.com/jonboulle/clockwork"
  28. "github.com/mitchellh/mapstructure"
  29. "github.com/pkg/errors"
  30. "github.com/sirupsen/logrus"
  31. "golang.org/x/sync/errgroup"
  32. "github.com/docker/compose/v2/pkg/api"
  33. "github.com/docker/compose/v2/pkg/utils"
  34. "github.com/docker/compose/v2/pkg/watch"
  35. )
  36. type DevelopmentConfig struct {
  37. Watch []Trigger `json:"watch,omitempty"`
  38. }
  39. const (
  40. WatchActionSync = "sync"
  41. WatchActionRebuild = "rebuild"
  42. )
  43. type Trigger struct {
  44. Path string `json:"path,omitempty"`
  45. Action string `json:"action,omitempty"`
  46. Target string `json:"target,omitempty"`
  47. Ignore []string `json:"ignore,omitempty"`
  48. }
  49. const quietPeriod = 2 * time.Second
  50. // fileEvent contains the Compose service and modified host system path.
  51. type fileEvent struct {
  52. // Service that the file event is for.
  53. Service string
  54. // HostPath that was created/modified/deleted outside the container.
  55. //
  56. // This is the path as seen from the user's perspective, e.g.
  57. // - C:\Users\moby\Documents\hello-world\main.go
  58. // - /Users/moby/Documents/hello-world/main.go
  59. HostPath string
  60. }
  61. func (s *composeService) Watch(ctx context.Context, project *types.Project, services []string, _ api.WatchOptions) error { //nolint: gocyclo
  62. needRebuild := make(chan fileEvent)
  63. needSync := make(chan sync.PathMapping)
  64. _, err := s.prepareProjectForBuild(project, nil)
  65. if err != nil {
  66. return err
  67. }
  68. eg, ctx := errgroup.WithContext(ctx)
  69. eg.Go(func() error {
  70. clock := clockwork.NewRealClock()
  71. debounce(ctx, clock, quietPeriod, needRebuild, s.makeRebuildFn(ctx, project))
  72. return nil
  73. })
  74. eg.Go(s.makeSyncFn(ctx, project, needSync))
  75. ss, err := project.GetServices(services...)
  76. if err != nil {
  77. return err
  78. }
  79. watching := false
  80. for _, service := range ss {
  81. config, err := loadDevelopmentConfig(service, project)
  82. if err != nil {
  83. return err
  84. }
  85. if config == nil {
  86. continue
  87. }
  88. if len(config.Watch) > 0 && service.Build == nil {
  89. // service configured with watchers but no build section
  90. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  91. }
  92. if len(services) > 0 && service.Build == nil {
  93. // service explicitly selected for watch has no build section
  94. return fmt.Errorf("can't watch service %q without a build context", service.Name)
  95. }
  96. if len(services) == 0 && service.Build == nil {
  97. continue
  98. }
  99. name := service.Name
  100. dockerIgnores, err := watch.LoadDockerIgnore(service.Build.Context)
  101. if err != nil {
  102. return err
  103. }
  104. // add a hardcoded set of ignores on top of what came from .dockerignore
  105. // some of this should likely be configurable (e.g. there could be cases
  106. // where you want `.git` to be synced) but this is suitable for now
  107. dotGitIgnore, err := watch.NewDockerPatternMatcher("/", []string{".git/"})
  108. if err != nil {
  109. return err
  110. }
  111. ignore := watch.NewCompositeMatcher(
  112. dockerIgnores,
  113. watch.EphemeralPathMatcher(),
  114. dotGitIgnore,
  115. )
  116. var paths []string
  117. for _, trigger := range config.Watch {
  118. if checkIfPathAlreadyBindMounted(trigger.Path, service.Volumes) {
  119. logrus.Warnf("path '%s' also declared by a bind mount volume, this path won't be monitored!\n", trigger.Path)
  120. continue
  121. }
  122. paths = append(paths, trigger.Path)
  123. }
  124. watcher, err := watch.NewWatcher(paths, ignore)
  125. if err != nil {
  126. return err
  127. }
  128. fmt.Fprintf(s.stdinfo(), "watching %s\n", paths)
  129. err = watcher.Start()
  130. if err != nil {
  131. return err
  132. }
  133. watching = true
  134. eg.Go(func() error {
  135. defer watcher.Close() //nolint:errcheck
  136. return s.watch(ctx, name, watcher, config.Watch, needSync, needRebuild)
  137. })
  138. }
  139. if !watching {
  140. return fmt.Errorf("none of the selected services is configured for watch, consider setting an 'x-develop' section")
  141. }
  142. return eg.Wait()
  143. }
  144. func (s *composeService) watch(ctx context.Context, name string, watcher watch.Notify, triggers []Trigger, needSync chan sync.PathMapping, needRebuild chan fileEvent) error {
  145. ignores := make([]watch.PathMatcher, len(triggers))
  146. for i, trigger := range triggers {
  147. ignore, err := watch.NewDockerPatternMatcher(trigger.Path, trigger.Ignore)
  148. if err != nil {
  149. return err
  150. }
  151. ignores[i] = ignore
  152. }
  153. WATCH:
  154. for {
  155. select {
  156. case <-ctx.Done():
  157. return nil
  158. case event := <-watcher.Events():
  159. hostPath := event.Path()
  160. for i, trigger := range triggers {
  161. logrus.Debugf("change for %s - comparing with %s", hostPath, trigger.Path)
  162. if watch.IsChild(trigger.Path, hostPath) {
  163. match, err := ignores[i].Matches(hostPath)
  164. if err != nil {
  165. logrus.Warnf("error ignore matching %q: %v", hostPath, err)
  166. return err
  167. }
  168. if match {
  169. logrus.Debugf("%s is matching ignore pattern", hostPath)
  170. continue
  171. }
  172. logrus.Infof("change for %q", hostPath)
  173. fmt.Fprintf(s.stdinfo(), "change detected on %s\n", hostPath)
  174. switch trigger.Action {
  175. case WatchActionSync:
  176. logrus.Debugf("modified file %s triggered sync", hostPath)
  177. rel, err := filepath.Rel(trigger.Path, hostPath)
  178. if err != nil {
  179. return err
  180. }
  181. needSync <- sync.PathMapping{
  182. Service: name,
  183. HostPath: hostPath,
  184. // always use Unix-style paths for inside the container
  185. ContainerPath: path.Join(trigger.Target, rel),
  186. }
  187. case WatchActionRebuild:
  188. logrus.Debugf("modified file %s requires image to be rebuilt", hostPath)
  189. needRebuild <- fileEvent{
  190. HostPath: hostPath,
  191. Service: name,
  192. }
  193. default:
  194. return fmt.Errorf("watch action %q is not supported", trigger)
  195. }
  196. continue WATCH
  197. }
  198. }
  199. case err := <-watcher.Errors():
  200. return err
  201. }
  202. }
  203. }
  204. func loadDevelopmentConfig(service types.ServiceConfig, project *types.Project) (*DevelopmentConfig, error) {
  205. var config DevelopmentConfig
  206. y, ok := service.Extensions["x-develop"]
  207. if !ok {
  208. return nil, nil
  209. }
  210. err := mapstructure.Decode(y, &config)
  211. if err != nil {
  212. return nil, err
  213. }
  214. baseDir, err := filepath.EvalSymlinks(project.WorkingDir)
  215. if err != nil {
  216. return nil, fmt.Errorf("resolving symlink for %q: %w", project.WorkingDir, err)
  217. }
  218. for i, trigger := range config.Watch {
  219. if !filepath.IsAbs(trigger.Path) {
  220. trigger.Path = filepath.Join(baseDir, trigger.Path)
  221. }
  222. if p, err := filepath.EvalSymlinks(trigger.Path); err == nil {
  223. // this might fail because the path doesn't exist, etc.
  224. trigger.Path = p
  225. }
  226. trigger.Path = filepath.Clean(trigger.Path)
  227. if trigger.Path == "" {
  228. return nil, errors.New("watch rules MUST define a path")
  229. }
  230. if trigger.Action == WatchActionRebuild && service.Build == nil {
  231. return nil, fmt.Errorf("service %s doesn't have a build section, can't apply 'rebuild' on watch", service.Name)
  232. }
  233. config.Watch[i] = trigger
  234. }
  235. return &config, nil
  236. }
  237. func (s *composeService) makeRebuildFn(ctx context.Context, project *types.Project) func(services rebuildServices) {
  238. for i, service := range project.Services {
  239. service.PullPolicy = types.PullPolicyBuild
  240. project.Services[i] = service
  241. }
  242. return func(services rebuildServices) {
  243. serviceNames := make([]string, 0, len(services))
  244. allPaths := make(utils.Set[string])
  245. for serviceName, paths := range services {
  246. serviceNames = append(serviceNames, serviceName)
  247. for p := range paths {
  248. allPaths.Add(p)
  249. }
  250. }
  251. fmt.Fprintf(
  252. s.stdinfo(),
  253. "Rebuilding %s after changes were detected:%s\n",
  254. strings.Join(serviceNames, ", "),
  255. strings.Join(append([]string{""}, allPaths.Elements()...), "\n - "),
  256. )
  257. err := s.Up(ctx, project, api.UpOptions{
  258. Create: api.CreateOptions{
  259. Services: serviceNames,
  260. Inherit: true,
  261. },
  262. Start: api.StartOptions{
  263. Services: serviceNames,
  264. Project: project,
  265. },
  266. })
  267. if err != nil {
  268. fmt.Fprintf(s.stderr(), "Application failed to start after update\n")
  269. }
  270. }
  271. }
  272. func (s *composeService) makeSyncFn(
  273. ctx context.Context,
  274. project *types.Project,
  275. needSync <-chan sync.PathMapping,
  276. ) func() error {
  277. var syncer sync.Syncer
  278. if useTar, _ := strconv.ParseBool(os.Getenv("COMPOSE_EXPERIMENTAL_WATCH_TAR")); useTar {
  279. syncer = sync.NewTar(project.Name, tarDockerClient{s: s})
  280. } else {
  281. syncer = sync.NewDockerCopy(project.Name, s, s.stdinfo())
  282. }
  283. return func() error {
  284. for {
  285. select {
  286. case <-ctx.Done():
  287. return nil
  288. case op := <-needSync:
  289. service, err := project.GetService(op.Service)
  290. if err != nil {
  291. return err
  292. }
  293. if err := syncer.Sync(ctx, service, []sync.PathMapping{op}); err != nil {
  294. return err
  295. }
  296. }
  297. }
  298. }
  299. }
  300. type rebuildServices map[string]utils.Set[string]
  301. func debounce(ctx context.Context, clock clockwork.Clock, delay time.Duration, input <-chan fileEvent, fn func(services rebuildServices)) {
  302. services := make(rebuildServices)
  303. t := clock.NewTimer(delay)
  304. defer t.Stop()
  305. for {
  306. select {
  307. case <-ctx.Done():
  308. return
  309. case <-t.Chan():
  310. if len(services) > 0 {
  311. go fn(services)
  312. services = make(rebuildServices)
  313. }
  314. case e := <-input:
  315. t.Reset(delay)
  316. svc, ok := services[e.Service]
  317. if !ok {
  318. svc = make(utils.Set[string])
  319. services[e.Service] = svc
  320. }
  321. svc.Add(e.HostPath)
  322. }
  323. }
  324. }
  325. func checkIfPathAlreadyBindMounted(watchPath string, volumes []types.ServiceVolumeConfig) bool {
  326. for _, volume := range volumes {
  327. if volume.Bind != nil && strings.HasPrefix(watchPath, volume.Source) {
  328. return true
  329. }
  330. }
  331. return false
  332. }
  333. type tarDockerClient struct {
  334. s *composeService
  335. }
  336. func (t tarDockerClient) ContainersForService(ctx context.Context, projectName string, serviceName string) ([]moby.Container, error) {
  337. containers, err := t.s.getContainers(ctx, projectName, oneOffExclude, true, serviceName)
  338. if err != nil {
  339. return nil, err
  340. }
  341. return containers, nil
  342. }
  343. func (t tarDockerClient) Exec(ctx context.Context, containerID string, cmd []string, in io.Reader) error {
  344. execCfg := moby.ExecConfig{
  345. Cmd: cmd,
  346. AttachStdout: false,
  347. AttachStderr: true,
  348. AttachStdin: in != nil,
  349. Tty: false,
  350. }
  351. execCreateResp, err := t.s.apiClient().ContainerExecCreate(ctx, containerID, execCfg)
  352. if err != nil {
  353. return err
  354. }
  355. startCheck := moby.ExecStartCheck{Tty: false, Detach: false}
  356. conn, err := t.s.apiClient().ContainerExecAttach(ctx, execCreateResp.ID, startCheck)
  357. if err != nil {
  358. return err
  359. }
  360. defer conn.Close()
  361. var eg errgroup.Group
  362. if in != nil {
  363. eg.Go(func() error {
  364. defer func() {
  365. _ = conn.CloseWrite()
  366. }()
  367. _, err := io.Copy(conn.Conn, in)
  368. return err
  369. })
  370. }
  371. eg.Go(func() error {
  372. _, err := io.Copy(t.s.stdinfo(), conn.Reader)
  373. return err
  374. })
  375. err = t.s.apiClient().ContainerExecStart(ctx, execCreateResp.ID, startCheck)
  376. if err != nil {
  377. return err
  378. }
  379. // although the errgroup is not tied directly to the context, the operations
  380. // in it are reading/writing to the connection, which is tied to the context,
  381. // so they won't block indefinitely
  382. if err := eg.Wait(); err != nil {
  383. return err
  384. }
  385. execResult, err := t.s.apiClient().ContainerExecInspect(ctx, execCreateResp.ID)
  386. if err != nil {
  387. return err
  388. }
  389. if execResult.ExitCode != 0 {
  390. return fmt.Errorf("exit code %d", execResult.ExitCode)
  391. }
  392. return nil
  393. }