plugins.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "os"
  20. "os/exec"
  21. "strings"
  22. "github.com/compose-spec/compose-go/v2/types"
  23. "github.com/docker/cli/cli-plugins/manager"
  24. "github.com/docker/cli/cli-plugins/socket"
  25. "github.com/docker/compose/v2/pkg/progress"
  26. "github.com/docker/docker/errdefs"
  27. "github.com/pkg/errors"
  28. "github.com/spf13/cobra"
  29. "go.opentelemetry.io/otel"
  30. "go.opentelemetry.io/otel/propagation"
  31. "golang.org/x/sync/errgroup"
  32. )
  33. type JsonMessage struct {
  34. Type string `json:"type"`
  35. Message string `json:"message"`
  36. }
  37. const (
  38. ErrorType = "error"
  39. InfoType = "info"
  40. SetEnvType = "setenv"
  41. )
  42. func (s *composeService) runPlugin(ctx context.Context, project *types.Project, service types.ServiceConfig, command string) error {
  43. x := *service.External
  44. // Only support Docker CLI plugins for first iteration. Could support any binary from PATH
  45. plugin, err := manager.GetPlugin(x.Type, s.dockerCli, &cobra.Command{})
  46. if err != nil {
  47. if errdefs.IsNotFound(err) {
  48. return fmt.Errorf("unsupported external service type %s", x.Type)
  49. }
  50. return err
  51. }
  52. args := []string{"compose", "--project-name", project.Name, command}
  53. for k, v := range x.Options {
  54. args = append(args, fmt.Sprintf("--%s=%s", k, v))
  55. }
  56. cmd := exec.CommandContext(ctx, plugin.Path, args...)
  57. // Remove DOCKER_CLI_PLUGIN... variable so plugin can detect it run standalone
  58. cmd.Env = filter(os.Environ(), manager.ReexecEnvvar)
  59. // Use docker/cli mechanism to propagate termination signal to child process
  60. server, err := socket.NewPluginServer(nil)
  61. if err != nil {
  62. defer server.Close() //nolint:errcheck
  63. cmd.Cancel = server.Close
  64. cmd.Env = replace(cmd.Env, socket.EnvKey, server.Addr().String())
  65. }
  66. cmd.Env = append(cmd.Env, fmt.Sprintf("DOCKER_CONTEXT=%s", s.dockerCli.CurrentContext()))
  67. // propagate opentelemetry context to child process, see https://github.com/open-telemetry/oteps/blob/main/text/0258-env-context-baggage-carriers.md
  68. carrier := propagation.MapCarrier{}
  69. otel.GetTextMapPropagator().Inject(ctx, &carrier)
  70. cmd.Env = append(cmd.Env, types.Mapping(carrier).Values()...)
  71. eg := errgroup.Group{}
  72. stdout, err := cmd.StdoutPipe()
  73. if err != nil {
  74. return err
  75. }
  76. err = cmd.Start()
  77. if err != nil {
  78. return err
  79. }
  80. eg.Go(cmd.Wait)
  81. decoder := json.NewDecoder(stdout)
  82. defer stdout.Close()
  83. variables := types.Mapping{}
  84. pw := progress.ContextWriter(ctx)
  85. pw.Event(progress.CreatingEvent(service.Name))
  86. for {
  87. var msg JsonMessage
  88. err = decoder.Decode(&msg)
  89. if err == io.EOF {
  90. break
  91. }
  92. if err != nil {
  93. return err
  94. }
  95. switch msg.Type {
  96. case ErrorType:
  97. pw.Event(progress.ErrorMessageEvent(service.Name, "error"))
  98. return errors.New(msg.Message)
  99. case InfoType:
  100. pw.Event(progress.ErrorMessageEvent(service.Name, msg.Message))
  101. case SetEnvType:
  102. key, val, found := strings.Cut(msg.Message, "=")
  103. if !found {
  104. return fmt.Errorf("invalid response from plugin: %s", msg.Message)
  105. }
  106. variables[key] = val
  107. default:
  108. return fmt.Errorf("invalid response from plugin: %s", msg.Type)
  109. }
  110. }
  111. err = eg.Wait()
  112. if err != nil {
  113. pw.Event(progress.ErrorMessageEvent(service.Name, err.Error()))
  114. return errors.Wrapf(err, "failed to create external service")
  115. }
  116. pw.Event(progress.CreatedEvent(service.Name))
  117. prefix := strings.ToUpper(service.Name) + "_"
  118. for name, s := range project.Services {
  119. if _, ok := s.DependsOn[service.Name]; ok {
  120. for key, val := range variables {
  121. s.Environment[prefix+key] = &val
  122. }
  123. project.Services[name] = s
  124. }
  125. }
  126. return nil
  127. }