client.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. /*
  2. Copyright 2024 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package desktop
  14. import (
  15. "bytes"
  16. "context"
  17. "encoding/json"
  18. "errors"
  19. "fmt"
  20. "io"
  21. "net"
  22. "net/http"
  23. "strings"
  24. "github.com/docker/compose/v2/internal"
  25. "github.com/docker/compose/v2/internal/memnet"
  26. "github.com/r3labs/sse"
  27. "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
  28. )
  29. // identify this client in the logs
  30. var userAgent = "compose/" + internal.Version
  31. // Client for integration with Docker Desktop features.
  32. type Client struct {
  33. apiEndpoint string
  34. client *http.Client
  35. }
  36. // NewClient creates a Desktop integration client for the provided in-memory
  37. // socket address (AF_UNIX or named pipe).
  38. func NewClient(apiEndpoint string) *Client {
  39. var transport http.RoundTripper = &http.Transport{
  40. DisableCompression: true,
  41. DialContext: func(ctx context.Context, _, _ string) (net.Conn, error) {
  42. return memnet.DialEndpoint(ctx, apiEndpoint)
  43. },
  44. }
  45. transport = otelhttp.NewTransport(transport)
  46. c := &Client{
  47. apiEndpoint: apiEndpoint,
  48. client: &http.Client{Transport: transport},
  49. }
  50. return c
  51. }
  52. func (c *Client) Endpoint() string {
  53. return c.apiEndpoint
  54. }
  55. // Close releases any open connections.
  56. func (c *Client) Close() error {
  57. c.client.CloseIdleConnections()
  58. return nil
  59. }
  60. type PingResponse struct {
  61. ServerTime int64 `json:"serverTime"`
  62. }
  63. // Ping is a minimal API used to ensure that the server is available.
  64. func (c *Client) Ping(ctx context.Context) (*PingResponse, error) {
  65. req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/ping"), http.NoBody)
  66. if err != nil {
  67. return nil, err
  68. }
  69. req.Header.Set("User-Agent", userAgent)
  70. resp, err := c.client.Do(req)
  71. if err != nil {
  72. return nil, err
  73. }
  74. defer func() {
  75. _ = resp.Body.Close()
  76. }()
  77. if resp.StatusCode != http.StatusOK {
  78. return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  79. }
  80. var ret PingResponse
  81. if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
  82. return nil, err
  83. }
  84. return &ret, nil
  85. }
  86. type FeatureFlagResponse map[string]FeatureFlagValue
  87. type FeatureFlagValue struct {
  88. Enabled bool `json:"enabled"`
  89. }
  90. func (c *Client) FeatureFlags(ctx context.Context) (FeatureFlagResponse, error) {
  91. req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/features"), http.NoBody)
  92. if err != nil {
  93. return nil, err
  94. }
  95. req.Header.Set("User-Agent", userAgent)
  96. resp, err := c.client.Do(req)
  97. if err != nil {
  98. return nil, err
  99. }
  100. defer func() {
  101. _ = resp.Body.Close()
  102. }()
  103. if resp.StatusCode != http.StatusOK {
  104. return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode)
  105. }
  106. var ret FeatureFlagResponse
  107. if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
  108. return nil, err
  109. }
  110. return ret, nil
  111. }
  112. type GetFileSharesConfigResponse struct {
  113. Active bool `json:"active"`
  114. Compose struct {
  115. ManageBindMounts bool `json:"manageBindMounts"`
  116. }
  117. }
  118. func (c *Client) GetFileSharesConfig(ctx context.Context) (*GetFileSharesConfigResponse, error) {
  119. req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares/config"), http.NoBody)
  120. if err != nil {
  121. return nil, err
  122. }
  123. req.Header.Set("User-Agent", userAgent)
  124. resp, err := c.client.Do(req)
  125. if err != nil {
  126. return nil, err
  127. }
  128. defer func() {
  129. _ = resp.Body.Close()
  130. }()
  131. if resp.StatusCode != http.StatusOK {
  132. return nil, newHTTPStatusCodeError(resp)
  133. }
  134. var ret GetFileSharesConfigResponse
  135. if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
  136. return nil, err
  137. }
  138. return &ret, nil
  139. }
  140. type CreateFileShareRequest struct {
  141. HostPath string `json:"hostPath"`
  142. Labels map[string]string `json:"labels,omitempty"`
  143. }
  144. type CreateFileShareResponse struct {
  145. FileShareID string `json:"fileShareID"`
  146. }
  147. func (c *Client) CreateFileShare(ctx context.Context, r CreateFileShareRequest) (*CreateFileShareResponse, error) {
  148. rawBody, _ := json.Marshal(r)
  149. req, err := http.NewRequestWithContext(ctx, http.MethodPost, backendURL("/mutagen/file-shares"), bytes.NewReader(rawBody))
  150. if err != nil {
  151. return nil, err
  152. }
  153. req.Header.Set("Content-Type", "application/json")
  154. req.Header.Set("User-Agent", userAgent)
  155. resp, err := c.client.Do(req)
  156. if err != nil {
  157. return nil, err
  158. }
  159. defer func() {
  160. _ = resp.Body.Close()
  161. }()
  162. if resp.StatusCode != http.StatusOK {
  163. errBody, _ := io.ReadAll(resp.Body)
  164. return nil, fmt.Errorf("unexpected status code %d: %s", resp.StatusCode, string(errBody))
  165. }
  166. var ret CreateFileShareResponse
  167. if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
  168. return nil, err
  169. }
  170. return &ret, nil
  171. }
  172. type FileShareReceiverState struct {
  173. TotalReceivedSize uint64 `json:"totalReceivedSize"`
  174. }
  175. type FileShareEndpoint struct {
  176. Path string `json:"path"`
  177. TotalFileSize uint64 `json:"totalFileSize,omitempty"`
  178. StagingProgress *FileShareReceiverState `json:"stagingProgress"`
  179. }
  180. type FileShareSession struct {
  181. SessionID string `json:"identifier"`
  182. Alpha FileShareEndpoint `json:"alpha"`
  183. Beta FileShareEndpoint `json:"beta"`
  184. Labels map[string]string `json:"labels"`
  185. Status string `json:"status"`
  186. }
  187. func (c *Client) ListFileShares(ctx context.Context) ([]FileShareSession, error) {
  188. req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares"), http.NoBody)
  189. if err != nil {
  190. return nil, err
  191. }
  192. req.Header.Set("User-Agent", userAgent)
  193. resp, err := c.client.Do(req)
  194. if err != nil {
  195. return nil, err
  196. }
  197. defer func() {
  198. _ = resp.Body.Close()
  199. }()
  200. if resp.StatusCode != http.StatusOK {
  201. return nil, newHTTPStatusCodeError(resp)
  202. }
  203. var ret []FileShareSession
  204. if err := json.NewDecoder(resp.Body).Decode(&ret); err != nil {
  205. return nil, err
  206. }
  207. return ret, nil
  208. }
  209. func (c *Client) DeleteFileShare(ctx context.Context, id string) error {
  210. req, err := http.NewRequestWithContext(ctx, http.MethodDelete, backendURL("/mutagen/file-shares/"+id), http.NoBody)
  211. if err != nil {
  212. return err
  213. }
  214. req.Header.Set("User-Agent", userAgent)
  215. resp, err := c.client.Do(req)
  216. if err != nil {
  217. return err
  218. }
  219. defer func() {
  220. _ = resp.Body.Close()
  221. }()
  222. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  223. return newHTTPStatusCodeError(resp)
  224. }
  225. return nil
  226. }
  227. type EventMessage[T any] struct {
  228. Value T
  229. Error error
  230. }
  231. func newHTTPStatusCodeError(resp *http.Response) error {
  232. r := io.LimitReader(resp.Body, 2048)
  233. body, err := io.ReadAll(r)
  234. if err != nil {
  235. return fmt.Errorf("http status code %d", resp.StatusCode)
  236. }
  237. return fmt.Errorf("http status code %d: %s", resp.StatusCode, string(body))
  238. }
  239. func (c *Client) StreamFileShares(ctx context.Context) (<-chan EventMessage[[]FileShareSession], error) {
  240. req, err := http.NewRequestWithContext(ctx, http.MethodGet, backendURL("/mutagen/file-shares/stream"), http.NoBody)
  241. if err != nil {
  242. return nil, err
  243. }
  244. req.Header.Set("User-Agent", userAgent)
  245. resp, err := c.client.Do(req)
  246. if err != nil {
  247. return nil, err
  248. }
  249. if resp.StatusCode < 200 || resp.StatusCode >= 300 {
  250. defer func() {
  251. _ = resp.Body.Close()
  252. }()
  253. return nil, newHTTPStatusCodeError(resp)
  254. }
  255. events := make(chan EventMessage[[]FileShareSession])
  256. go func(ctx context.Context) {
  257. defer func() {
  258. _ = resp.Body.Close()
  259. for range events {
  260. // drain the channel
  261. }
  262. close(events)
  263. }()
  264. if err := readEvents(ctx, resp.Body, events); err != nil {
  265. select {
  266. case <-ctx.Done():
  267. case events <- EventMessage[[]FileShareSession]{Error: err}:
  268. }
  269. }
  270. }(ctx)
  271. return events, nil
  272. }
  273. func readEvents[T any](ctx context.Context, r io.Reader, events chan<- EventMessage[T]) error {
  274. eventReader := sse.NewEventStreamReader(r)
  275. for {
  276. msg, err := eventReader.ReadEvent()
  277. if errors.Is(err, io.EOF) {
  278. return nil
  279. } else if err != nil {
  280. return fmt.Errorf("reading events: %w", err)
  281. }
  282. msg = bytes.TrimPrefix(msg, []byte("data: "))
  283. var event T
  284. if err := json.Unmarshal(msg, &event); err != nil {
  285. return err
  286. }
  287. select {
  288. case <-ctx.Done():
  289. return context.Cause(ctx)
  290. case events <- EventMessage[T]{Value: event}:
  291. // event was sent to channel, read next
  292. }
  293. }
  294. }
  295. // backendURL generates a URL for the given API path.
  296. //
  297. // NOTE: Custom transport handles communication. The host is to create a valid
  298. // URL for the Go http.Client that is also descriptive in error/logs.
  299. func backendURL(path string) string {
  300. return "http://docker-desktop/" + strings.TrimPrefix(path, "/")
  301. }