file_shares.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. /*
  2. Copyright 2024 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package desktop
  14. import (
  15. "context"
  16. "errors"
  17. "fmt"
  18. "strings"
  19. "sync"
  20. "github.com/docker/compose/v2/internal/paths"
  21. "github.com/docker/compose/v2/pkg/api"
  22. "github.com/docker/compose/v2/pkg/progress"
  23. "github.com/docker/go-units"
  24. "github.com/sirupsen/logrus"
  25. )
  26. // fileShareProgressID is the identifier used for the root grouping of file
  27. // share events in the progress writer.
  28. const fileShareProgressID = "Synchronized File Shares"
  29. // RemoveFileSharesForProject removes any Synchronized File Shares that were
  30. // created by Compose for this project in the past if possible.
  31. //
  32. // Errors are not propagated; they are only sent to the progress writer.
  33. func RemoveFileSharesForProject(ctx context.Context, c *Client, projectName string) {
  34. w := progress.ContextWriter(ctx)
  35. existing, err := c.ListFileShares(ctx)
  36. if err != nil {
  37. w.TailMsgf("Synchronized File Shares not removed due to error: %v", err)
  38. return
  39. }
  40. // filter the list first, so we can early return and not show the event if
  41. // there's no sessions to clean up
  42. var toRemove []FileShareSession
  43. for _, share := range existing {
  44. if share.Labels["com.docker.compose.project"] == projectName {
  45. toRemove = append(toRemove, share)
  46. }
  47. }
  48. if len(toRemove) == 0 {
  49. return
  50. }
  51. w.Event(progress.NewEvent(fileShareProgressID, progress.Working, "Removing"))
  52. rootResult := progress.Done
  53. defer func() {
  54. w.Event(progress.NewEvent(fileShareProgressID, rootResult, ""))
  55. }()
  56. for _, share := range toRemove {
  57. shareID := share.Labels["com.docker.desktop.mutagen.file-share.id"]
  58. if shareID == "" {
  59. w.Event(progress.Event{
  60. ID: share.Alpha.Path,
  61. ParentID: fileShareProgressID,
  62. Status: progress.Warning,
  63. StatusText: "Invalid",
  64. })
  65. continue
  66. }
  67. w.Event(progress.Event{
  68. ID: share.Alpha.Path,
  69. ParentID: fileShareProgressID,
  70. Status: progress.Working,
  71. })
  72. var status progress.EventStatus
  73. var statusText string
  74. if err := c.DeleteFileShare(ctx, shareID); err != nil {
  75. // TODO(milas): Docker Desktop is doing weird things with error responses,
  76. // once fixed, we can return proper error types from the client
  77. if strings.Contains(err.Error(), "file share in use") {
  78. status = progress.Warning
  79. statusText = "Resource is still in use"
  80. if rootResult != progress.Error {
  81. // error takes precedence over warning
  82. rootResult = progress.Warning
  83. }
  84. } else {
  85. logrus.Debugf("Error deleting file share %q: %v", shareID, err)
  86. status = progress.Error
  87. rootResult = progress.Error
  88. }
  89. } else {
  90. logrus.Debugf("Deleted file share: %s", shareID)
  91. status = progress.Done
  92. }
  93. w.Event(progress.Event{
  94. ID: share.Alpha.Path,
  95. ParentID: fileShareProgressID,
  96. Status: status,
  97. StatusText: statusText,
  98. })
  99. }
  100. }
  101. // FileShareManager maps between Compose bind mounts and Desktop File Shares
  102. // state.
  103. type FileShareManager struct {
  104. mu sync.Mutex
  105. cli *Client
  106. projectName string
  107. hostPaths []string
  108. // state holds session status keyed by file share ID.
  109. state map[string]*FileShareSession
  110. }
  111. func NewFileShareManager(cli *Client, projectName string, hostPaths []string) *FileShareManager {
  112. return &FileShareManager{
  113. cli: cli,
  114. projectName: projectName,
  115. hostPaths: hostPaths,
  116. state: make(map[string]*FileShareSession),
  117. }
  118. }
  119. // EnsureExists looks for existing File Shares or creates new ones for the
  120. // host paths.
  121. //
  122. // This function blocks until each share reaches steady state, at which point
  123. // flow can continue.
  124. func (m *FileShareManager) EnsureExists(ctx context.Context) (err error) {
  125. w := progress.ContextWriter(ctx)
  126. w.Event(progress.NewEvent(fileShareProgressID, progress.Working, ""))
  127. defer func() {
  128. if err != nil {
  129. w.Event(progress.NewEvent(fileShareProgressID, progress.Error, ""))
  130. } else {
  131. w.Event(progress.NewEvent(fileShareProgressID, progress.Done, ""))
  132. }
  133. }()
  134. wait := &waiter{
  135. shareIDs: make(map[string]struct{}),
  136. done: make(chan struct{}),
  137. }
  138. handler := m.eventHandler(w, wait)
  139. ctx, cancel := context.WithCancel(ctx)
  140. defer cancel()
  141. // stream session events to update internal state for project
  142. monitorErr := make(chan error, 1)
  143. go func() {
  144. defer close(monitorErr)
  145. if err := m.watch(ctx, handler); err != nil && ctx.Err() == nil {
  146. monitorErr <- err
  147. }
  148. }()
  149. if err := m.initialize(ctx, wait, handler); err != nil {
  150. return err
  151. }
  152. waitCh := wait.start()
  153. if waitCh != nil {
  154. select {
  155. case <-ctx.Done():
  156. return context.Cause(ctx)
  157. case err := <-monitorErr:
  158. if err != nil {
  159. return fmt.Errorf("watching file share sessions: %w", err)
  160. } else if ctx.Err() == nil {
  161. // this indicates a bug - it should not stop w/o an error if the context is still active
  162. return errors.New("file share session watch stopped unexpectedly")
  163. }
  164. case <-wait.start():
  165. // everything is done
  166. }
  167. }
  168. return nil
  169. }
  170. // initialize finds existing shares or creates new ones for the host paths.
  171. //
  172. // Once a share is found/created, its progress is monitored via the watch.
  173. func (m *FileShareManager) initialize(ctx context.Context, wait *waiter, handler func(FileShareSession)) error {
  174. // the watch is already running in the background, so the lock is taken
  175. // throughout to prevent interleaving writes
  176. m.mu.Lock()
  177. defer m.mu.Unlock()
  178. existing, err := m.cli.ListFileShares(ctx)
  179. if err != nil {
  180. return err
  181. }
  182. for _, path := range m.hostPaths {
  183. var fileShareID string
  184. var fss *FileShareSession
  185. if fss = findExistingShare(path, existing); fss != nil {
  186. fileShareID = fss.Beta.Path
  187. logrus.Debugf("Found existing suitable file share %s for path %q [%s]", fileShareID, path, fss.Alpha.Path)
  188. wait.addShare(fileShareID)
  189. handler(*fss)
  190. continue
  191. } else {
  192. req := CreateFileShareRequest{
  193. HostPath: path,
  194. Labels: map[string]string{
  195. "com.docker.compose.project": m.projectName,
  196. },
  197. }
  198. createResp, err := m.cli.CreateFileShare(ctx, req)
  199. if err != nil {
  200. return fmt.Errorf("creating file share: %w", err)
  201. }
  202. fileShareID = createResp.FileShareID
  203. fss = m.state[fileShareID]
  204. logrus.Debugf("Created file share %s for path %q", fileShareID, path)
  205. }
  206. wait.addShare(fileShareID)
  207. if fss != nil {
  208. handler(*fss)
  209. }
  210. }
  211. return nil
  212. }
  213. func (m *FileShareManager) watch(ctx context.Context, handler func(FileShareSession)) error {
  214. events, err := m.cli.StreamFileShares(ctx)
  215. if err != nil {
  216. return fmt.Errorf("streaming file shares: %w", err)
  217. }
  218. for {
  219. select {
  220. case <-ctx.Done():
  221. return nil
  222. case event := <-events:
  223. if event.Error != nil {
  224. return fmt.Errorf("reading file share events: %w", event.Error)
  225. }
  226. // closure for lock
  227. func() {
  228. m.mu.Lock()
  229. defer m.mu.Unlock()
  230. for _, fss := range event.Value {
  231. handler(fss)
  232. }
  233. }()
  234. }
  235. }
  236. }
  237. // eventHandler updates internal state, keeps track of in-progress syncs, and
  238. // prints relevant events to progress.
  239. func (m *FileShareManager) eventHandler(w progress.Writer, wait *waiter) func(fss FileShareSession) {
  240. return func(fss FileShareSession) {
  241. fileShareID := fss.Beta.Path
  242. shouldPrint := wait.isWatching(fileShareID)
  243. forProject := fss.Labels[api.ProjectLabel] == m.projectName
  244. if shouldPrint || forProject {
  245. m.state[fileShareID] = &fss
  246. }
  247. var percent int
  248. var current, total int64
  249. if fss.Beta.StagingProgress != nil {
  250. current = int64(fss.Beta.StagingProgress.TotalReceivedSize)
  251. } else {
  252. current = int64(fss.Beta.TotalFileSize)
  253. }
  254. total = int64(fss.Alpha.TotalFileSize)
  255. if total != 0 {
  256. percent = int(current * 100 / total)
  257. }
  258. var status progress.EventStatus
  259. var text string
  260. switch {
  261. case strings.HasPrefix(fss.Status, "halted"):
  262. wait.shareDone(fileShareID)
  263. status = progress.Error
  264. case fss.Status == "watching":
  265. wait.shareDone(fileShareID)
  266. status = progress.Done
  267. percent = 100
  268. case fss.Status == "staging-beta":
  269. status = progress.Working
  270. // TODO(milas): the printer doesn't style statuses for children nicely
  271. text = fmt.Sprintf(" Syncing (%7s / %-7s)",
  272. units.HumanSize(float64(current)),
  273. units.HumanSize(float64(total)),
  274. )
  275. default:
  276. // catch-all for various other transitional statuses
  277. status = progress.Working
  278. }
  279. evt := progress.Event{
  280. ID: fss.Alpha.Path,
  281. Status: status,
  282. Text: text,
  283. ParentID: fileShareProgressID,
  284. Current: current,
  285. Total: total,
  286. Percent: percent,
  287. }
  288. if shouldPrint {
  289. w.Event(evt)
  290. }
  291. }
  292. }
  293. func findExistingShare(path string, existing []FileShareSession) *FileShareSession {
  294. for _, share := range existing {
  295. if paths.IsChild(share.Alpha.Path, path) {
  296. return &share
  297. }
  298. }
  299. return nil
  300. }
  301. type waiter struct {
  302. mu sync.Mutex
  303. shareIDs map[string]struct{}
  304. done chan struct{}
  305. }
  306. func (w *waiter) addShare(fileShareID string) {
  307. w.mu.Lock()
  308. defer w.mu.Unlock()
  309. w.shareIDs[fileShareID] = struct{}{}
  310. }
  311. func (w *waiter) isWatching(fileShareID string) bool {
  312. w.mu.Lock()
  313. defer w.mu.Unlock()
  314. _, ok := w.shareIDs[fileShareID]
  315. return ok
  316. }
  317. // start returns a channel to wait for any outstanding shares to be ready.
  318. //
  319. // If no shares are registered when this is called, nil is returned.
  320. func (w *waiter) start() <-chan struct{} {
  321. w.mu.Lock()
  322. defer w.mu.Unlock()
  323. if len(w.shareIDs) == 0 {
  324. return nil
  325. }
  326. if w.done == nil {
  327. w.done = make(chan struct{})
  328. }
  329. return w.done
  330. }
  331. func (w *waiter) shareDone(fileShareID string) {
  332. w.mu.Lock()
  333. defer w.mu.Unlock()
  334. delete(w.shareIDs, fileShareID)
  335. if len(w.shareIDs) == 0 && w.done != nil {
  336. close(w.done)
  337. w.done = nil
  338. }
  339. }