file_shares.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. /*
  2. Copyright 2024 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package desktop
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "strings"
  19. "sync"
  20. "github.com/docker/compose/v2/internal/paths"
  21. "github.com/docker/compose/v2/pkg/api"
  22. "github.com/docker/compose/v2/pkg/progress"
  23. "github.com/docker/go-units"
  24. "github.com/sirupsen/logrus"
  25. )
  26. // fileShareProgressID is the identifier used for the root grouping of file
  27. // share events in the progress writer.
  28. const fileShareProgressID = "Synchronized File Shares"
  29. // RemoveFileSharesForProject removes any Synchronized File Shares that were
  30. // created by Compose for this project in the past if possible.
  31. //
  32. // Errors are not propagated; they are only sent to the progress writer.
  33. func RemoveFileSharesForProject(ctx context.Context, c *Client, projectName string) {
  34. w := progress.ContextWriter(ctx)
  35. existing, err := c.ListFileShares(ctx)
  36. if err != nil {
  37. w.TailMsgf("Synchronized File Shares not removed due to error: %v", err)
  38. return
  39. }
  40. // filter the list first, so we can early return and not show the event if
  41. // there's no sessions to clean up
  42. var toRemove []FileShareSession
  43. for _, share := range existing {
  44. if share.Labels["com.docker.compose.project"] == projectName {
  45. toRemove = append(toRemove, share)
  46. }
  47. }
  48. if len(toRemove) == 0 {
  49. return
  50. }
  51. w.Event(progress.NewEvent(fileShareProgressID, progress.Working, "Removing"))
  52. rootResult := progress.Done
  53. defer func() {
  54. w.Event(progress.NewEvent(fileShareProgressID, rootResult, ""))
  55. }()
  56. for _, share := range toRemove {
  57. shareID := share.Labels["com.docker.desktop.mutagen.file-share.id"]
  58. if shareID == "" {
  59. w.Event(progress.Event{
  60. ID: share.Alpha.Path,
  61. ParentID: fileShareProgressID,
  62. Status: progress.Warning,
  63. StatusText: "Invalid",
  64. })
  65. continue
  66. }
  67. w.Event(progress.Event{
  68. ID: share.Alpha.Path,
  69. ParentID: fileShareProgressID,
  70. Status: progress.Working,
  71. })
  72. var status progress.EventStatus
  73. var statusText string
  74. if err := c.DeleteFileShare(ctx, shareID); err != nil {
  75. // TODO(milas): Docker Desktop is doing weird things with error responses,
  76. // once fixed, we can return proper error types from the client
  77. if strings.Contains(err.Error(), "file share in use") {
  78. status = progress.Warning
  79. statusText = "Resource is still in use"
  80. if rootResult != progress.Error {
  81. // error takes precedence over warning
  82. rootResult = progress.Warning
  83. }
  84. } else {
  85. logrus.Debugf("Error deleting file share %q: %v", shareID, err)
  86. status = progress.Error
  87. rootResult = progress.Error
  88. }
  89. } else {
  90. logrus.Debugf("Deleted file share: %s", shareID)
  91. status = progress.Done
  92. }
  93. w.Event(progress.Event{
  94. ID: share.Alpha.Path,
  95. ParentID: fileShareProgressID,
  96. Status: status,
  97. StatusText: statusText,
  98. })
  99. }
  100. }
  101. // FileShareManager maps between Compose bind mounts and Desktop File Shares
  102. // state.
  103. type FileShareManager struct {
  104. mu sync.Mutex
  105. cli *Client
  106. projectName string
  107. hostPaths []string
  108. // state holds session status keyed by file share ID.
  109. state map[string]*FileShareSession
  110. }
  111. func NewFileShareManager(cli *Client, projectName string, hostPaths []string) *FileShareManager {
  112. return &FileShareManager{
  113. cli: cli,
  114. projectName: projectName,
  115. hostPaths: hostPaths,
  116. state: make(map[string]*FileShareSession),
  117. }
  118. }
  119. // EnsureExists looks for existing File Shares or creates new ones for the
  120. // host paths.
  121. //
  122. // This function blocks until each share reaches steady state, at which point
  123. // flow can continue.
  124. func (m *FileShareManager) EnsureExists(ctx context.Context) (err error) {
  125. w := progress.ContextWriter(ctx)
  126. // TODO(milas): this should be a per-node option, not global
  127. w.HasMore(false)
  128. w.Event(progress.NewEvent(fileShareProgressID, progress.Working, ""))
  129. defer func() {
  130. if err != nil {
  131. w.Event(progress.NewEvent(fileShareProgressID, progress.Error, ""))
  132. } else {
  133. w.Event(progress.NewEvent(fileShareProgressID, progress.Done, ""))
  134. }
  135. }()
  136. wait := &waiter{
  137. shareIDs: make(map[string]struct{}),
  138. done: make(chan struct{}),
  139. }
  140. handler := m.eventHandler(w, wait)
  141. ctx, cancel := context.WithCancel(ctx)
  142. defer cancel()
  143. // stream session events to update internal state for project
  144. monitorErr := make(chan error, 1)
  145. go func() {
  146. defer close(monitorErr)
  147. if err := m.watch(ctx, handler); err != nil && ctx.Err() == nil {
  148. monitorErr <- err
  149. }
  150. }()
  151. if err := m.initialize(ctx, wait, handler); err != nil {
  152. return err
  153. }
  154. waitCh := wait.start()
  155. if waitCh != nil {
  156. select {
  157. case <-ctx.Done():
  158. return context.Cause(ctx)
  159. case err := <-monitorErr:
  160. if err != nil {
  161. return fmt.Errorf("watching file share sessions: %w", err)
  162. } else if ctx.Err() == nil {
  163. // this indicates a bug - it should not stop w/o an error if the context is still active
  164. return errors.New("file share session watch stopped unexpectedly")
  165. }
  166. case <-wait.start():
  167. // everything is done
  168. }
  169. }
  170. return nil
  171. }
  172. // initialize finds existing shares or creates new ones for the host paths.
  173. //
  174. // Once a share is found/created, its progress is monitored via the watch.
  175. func (m *FileShareManager) initialize(ctx context.Context, wait *waiter, handler func(FileShareSession)) error {
  176. // the watch is already running in the background, so the lock is taken
  177. // throughout to prevent interleaving writes
  178. m.mu.Lock()
  179. defer m.mu.Unlock()
  180. existing, err := m.cli.ListFileShares(ctx)
  181. if err != nil {
  182. return err
  183. }
  184. for _, path := range m.hostPaths {
  185. var fileShareID string
  186. var fss *FileShareSession
  187. if fss = findExistingShare(path, existing); fss != nil {
  188. fileShareID = fss.Beta.Path
  189. logrus.Debugf("Found existing suitable file share %s for path %q [%s]", fileShareID, path, fss.Alpha.Path)
  190. wait.addShare(fileShareID)
  191. handler(*fss)
  192. continue
  193. } else {
  194. req := CreateFileShareRequest{
  195. HostPath: path,
  196. Labels: map[string]string{
  197. "com.docker.compose.project": m.projectName,
  198. },
  199. }
  200. createResp, err := m.cli.CreateFileShare(ctx, req)
  201. if err != nil {
  202. return fmt.Errorf("creating file share: %w", err)
  203. }
  204. fileShareID = createResp.FileShareID
  205. fss = m.state[fileShareID]
  206. logrus.Debugf("Created file share %s for path %q", fileShareID, path)
  207. }
  208. wait.addShare(fileShareID)
  209. if fss != nil {
  210. handler(*fss)
  211. }
  212. }
  213. return nil
  214. }
  215. func (m *FileShareManager) watch(ctx context.Context, handler func(FileShareSession)) error {
  216. events, err := m.cli.StreamFileShares(ctx)
  217. if err != nil {
  218. return fmt.Errorf("streaming file shares: %w", err)
  219. }
  220. for {
  221. select {
  222. case <-ctx.Done():
  223. return nil
  224. case event := <-events:
  225. if event.Error != nil {
  226. return fmt.Errorf("reading file share events: %w", event.Error)
  227. }
  228. // closure for lock
  229. func() {
  230. m.mu.Lock()
  231. defer m.mu.Unlock()
  232. for _, fss := range event.Value {
  233. handler(fss)
  234. }
  235. }()
  236. }
  237. }
  238. }
  239. // eventHandler updates internal state, keeps track of in-progress syncs, and
  240. // prints relevant events to progress.
  241. func (m *FileShareManager) eventHandler(w progress.Writer, wait *waiter) func(fss FileShareSession) {
  242. return func(fss FileShareSession) {
  243. fileShareID := fss.Beta.Path
  244. shouldPrint := wait.isWatching(fileShareID)
  245. forProject := fss.Labels[api.ProjectLabel] == m.projectName
  246. if shouldPrint || forProject {
  247. m.state[fileShareID] = &fss
  248. }
  249. var percent int
  250. var current, total int64
  251. if fss.Beta.StagingProgress != nil {
  252. current = int64(fss.Beta.StagingProgress.TotalReceivedSize)
  253. } else {
  254. current = int64(fss.Beta.TotalFileSize)
  255. }
  256. total = int64(fss.Alpha.TotalFileSize)
  257. if total != 0 {
  258. percent = int(current * 100 / total)
  259. }
  260. var status progress.EventStatus
  261. var text string
  262. switch {
  263. case strings.HasPrefix(fss.Status, "halted"):
  264. wait.shareDone(fileShareID)
  265. status = progress.Error
  266. case fss.Status == "watching":
  267. wait.shareDone(fileShareID)
  268. status = progress.Done
  269. percent = 100
  270. case fss.Status == "staging-beta":
  271. status = progress.Working
  272. // TODO(milas): the printer doesn't style statuses for children nicely
  273. text = fmt.Sprintf(" Syncing (%7s / %-7s)",
  274. units.HumanSize(float64(current)),
  275. units.HumanSize(float64(total)),
  276. )
  277. default:
  278. // catch-all for various other transitional statuses
  279. status = progress.Working
  280. }
  281. evt := progress.Event{
  282. ID: fss.Alpha.Path,
  283. Status: status,
  284. Text: text,
  285. ParentID: fileShareProgressID,
  286. Current: current,
  287. Total: total,
  288. Percent: percent,
  289. }
  290. if shouldPrint {
  291. w.Event(evt)
  292. }
  293. }
  294. }
  295. func findExistingShare(path string, existing []FileShareSession) *FileShareSession {
  296. for _, share := range existing {
  297. if paths.IsChild(share.Alpha.Path, path) {
  298. return &share
  299. }
  300. }
  301. return nil
  302. }
  303. type waiter struct {
  304. mu sync.Mutex
  305. shareIDs map[string]struct{}
  306. done chan struct{}
  307. }
  308. func (w *waiter) addShare(fileShareID string) {
  309. w.mu.Lock()
  310. defer w.mu.Unlock()
  311. w.shareIDs[fileShareID] = struct{}{}
  312. }
  313. func (w *waiter) isWatching(fileShareID string) bool {
  314. w.mu.Lock()
  315. defer w.mu.Unlock()
  316. _, ok := w.shareIDs[fileShareID]
  317. return ok
  318. }
  319. // start returns a channel to wait for any outstanding shares to be ready.
  320. //
  321. // If no shares are registered when this is called, nil is returned.
  322. func (w *waiter) start() <-chan struct{} {
  323. w.mu.Lock()
  324. defer w.mu.Unlock()
  325. if len(w.shareIDs) == 0 {
  326. return nil
  327. }
  328. if w.done == nil {
  329. w.done = make(chan struct{})
  330. }
  331. return w.done
  332. }
  333. func (w *waiter) shareDone(fileShareID string) {
  334. w.mu.Lock()
  335. defer w.mu.Unlock()
  336. delete(w.shareIDs, fileShareID)
  337. if len(w.shareIDs) == 0 && w.done != nil {
  338. close(w.done)
  339. w.done = nil
  340. }
  341. }