| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 | 
							- /*
 
-    Copyright 2024 Docker Compose CLI authors
 
-    Licensed under the Apache License, Version 2.0 (the "License");
 
-    you may not use this file except in compliance with the License.
 
-    You may obtain a copy of the License at
 
-        http://www.apache.org/licenses/LICENSE-2.0
 
-    Unless required by applicable law or agreed to in writing, software
 
-    distributed under the License is distributed on an "AS IS" BASIS,
 
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
-    See the License for the specific language governing permissions and
 
-    limitations under the License.
 
- */
 
- package desktop
 
- import (
 
- 	"context"
 
- 	"errors"
 
- 	"fmt"
 
- 	"strings"
 
- 	"sync"
 
- 	"github.com/docker/compose/v2/internal/paths"
 
- 	"github.com/docker/compose/v2/pkg/api"
 
- 	"github.com/docker/compose/v2/pkg/progress"
 
- 	"github.com/docker/go-units"
 
- 	"github.com/sirupsen/logrus"
 
- )
 
- // fileShareProgressID is the identifier used for the root grouping of file
 
- // share events in the progress writer.
 
- const fileShareProgressID = "Synchronized File Shares"
 
- // RemoveFileSharesForProject removes any Synchronized File Shares that were
 
- // created by Compose for this project in the past if possible.
 
- //
 
- // Errors are not propagated; they are only sent to the progress writer.
 
- func RemoveFileSharesForProject(ctx context.Context, c *Client, projectName string) {
 
- 	w := progress.ContextWriter(ctx)
 
- 	existing, err := c.ListFileShares(ctx)
 
- 	if err != nil {
 
- 		w.TailMsgf("Synchronized File Shares not removed due to error: %v", err)
 
- 		return
 
- 	}
 
- 	// filter the list first, so we can early return and not show the event if
 
- 	// there's no sessions to clean up
 
- 	var toRemove []FileShareSession
 
- 	for _, share := range existing {
 
- 		if share.Labels["com.docker.compose.project"] == projectName {
 
- 			toRemove = append(toRemove, share)
 
- 		}
 
- 	}
 
- 	if len(toRemove) == 0 {
 
- 		return
 
- 	}
 
- 	w.Event(progress.NewEvent(fileShareProgressID, progress.Working, "Removing"))
 
- 	rootResult := progress.Done
 
- 	defer func() {
 
- 		w.Event(progress.NewEvent(fileShareProgressID, rootResult, ""))
 
- 	}()
 
- 	for _, share := range toRemove {
 
- 		shareID := share.Labels["com.docker.desktop.mutagen.file-share.id"]
 
- 		if shareID == "" {
 
- 			w.Event(progress.Event{
 
- 				ID:         share.Alpha.Path,
 
- 				ParentID:   fileShareProgressID,
 
- 				Status:     progress.Warning,
 
- 				StatusText: "Invalid",
 
- 			})
 
- 			continue
 
- 		}
 
- 		w.Event(progress.Event{
 
- 			ID:       share.Alpha.Path,
 
- 			ParentID: fileShareProgressID,
 
- 			Status:   progress.Working,
 
- 		})
 
- 		var status progress.EventStatus
 
- 		var statusText string
 
- 		if err := c.DeleteFileShare(ctx, shareID); err != nil {
 
- 			// TODO(milas): Docker Desktop is doing weird things with error responses,
 
- 			// 	once fixed, we can return proper error types from the client
 
- 			if strings.Contains(err.Error(), "file share in use") {
 
- 				status = progress.Warning
 
- 				statusText = "Resource is still in use"
 
- 				if rootResult != progress.Error {
 
- 					// error takes precedence over warning
 
- 					rootResult = progress.Warning
 
- 				}
 
- 			} else {
 
- 				logrus.Debugf("Error deleting file share %q: %v", shareID, err)
 
- 				status = progress.Error
 
- 				rootResult = progress.Error
 
- 			}
 
- 		} else {
 
- 			logrus.Debugf("Deleted file share: %s", shareID)
 
- 			status = progress.Done
 
- 		}
 
- 		w.Event(progress.Event{
 
- 			ID:         share.Alpha.Path,
 
- 			ParentID:   fileShareProgressID,
 
- 			Status:     status,
 
- 			StatusText: statusText,
 
- 		})
 
- 	}
 
- }
 
- // FileShareManager maps between Compose bind mounts and Desktop File Shares
 
- // state.
 
- type FileShareManager struct {
 
- 	mu          sync.Mutex
 
- 	cli         *Client
 
- 	projectName string
 
- 	hostPaths   []string
 
- 	// state holds session status keyed by file share ID.
 
- 	state map[string]*FileShareSession
 
- }
 
- func NewFileShareManager(cli *Client, projectName string, hostPaths []string) *FileShareManager {
 
- 	return &FileShareManager{
 
- 		cli:         cli,
 
- 		projectName: projectName,
 
- 		hostPaths:   hostPaths,
 
- 		state:       make(map[string]*FileShareSession),
 
- 	}
 
- }
 
- // EnsureExists looks for existing File Shares or creates new ones for the
 
- // host paths.
 
- //
 
- // This function blocks until each share reaches steady state, at which point
 
- // flow can continue.
 
- func (m *FileShareManager) EnsureExists(ctx context.Context) (err error) {
 
- 	w := progress.ContextWriter(ctx)
 
- 	// TODO(milas): this should be a per-node option, not global
 
- 	w.HasMore(false)
 
- 	w.Event(progress.NewEvent(fileShareProgressID, progress.Working, ""))
 
- 	defer func() {
 
- 		if err != nil {
 
- 			w.Event(progress.NewEvent(fileShareProgressID, progress.Error, ""))
 
- 		} else {
 
- 			w.Event(progress.NewEvent(fileShareProgressID, progress.Done, ""))
 
- 		}
 
- 	}()
 
- 	wait := &waiter{
 
- 		shareIDs: make(map[string]struct{}),
 
- 		done:     make(chan struct{}),
 
- 	}
 
- 	handler := m.eventHandler(w, wait)
 
- 	ctx, cancel := context.WithCancel(ctx)
 
- 	defer cancel()
 
- 	// stream session events to update internal state for project
 
- 	monitorErr := make(chan error, 1)
 
- 	go func() {
 
- 		defer close(monitorErr)
 
- 		if err := m.watch(ctx, handler); err != nil && ctx.Err() == nil {
 
- 			monitorErr <- err
 
- 		}
 
- 	}()
 
- 	if err := m.initialize(ctx, wait, handler); err != nil {
 
- 		return err
 
- 	}
 
- 	waitCh := wait.start()
 
- 	if waitCh != nil {
 
- 		select {
 
- 		case <-ctx.Done():
 
- 			return context.Cause(ctx)
 
- 		case err := <-monitorErr:
 
- 			if err != nil {
 
- 				return fmt.Errorf("watching file share sessions: %w", err)
 
- 			} else if ctx.Err() == nil {
 
- 				// this indicates a bug - it should not stop w/o an error if the context is still active
 
- 				return errors.New("file share session watch stopped unexpectedly")
 
- 			}
 
- 		case <-wait.start():
 
- 			// everything is done
 
- 		}
 
- 	}
 
- 	return nil
 
- }
 
- // initialize finds existing shares or creates new ones for the host paths.
 
- //
 
- // Once a share is found/created, its progress is monitored via the watch.
 
- func (m *FileShareManager) initialize(ctx context.Context, wait *waiter, handler func(FileShareSession)) error {
 
- 	// the watch is already running in the background, so the lock is taken
 
- 	// throughout to prevent interleaving writes
 
- 	m.mu.Lock()
 
- 	defer m.mu.Unlock()
 
- 	existing, err := m.cli.ListFileShares(ctx)
 
- 	if err != nil {
 
- 		return err
 
- 	}
 
- 	for _, path := range m.hostPaths {
 
- 		var fileShareID string
 
- 		var fss *FileShareSession
 
- 		if fss = findExistingShare(path, existing); fss != nil {
 
- 			fileShareID = fss.Beta.Path
 
- 			logrus.Debugf("Found existing suitable file share %s for path %q [%s]", fileShareID, path, fss.Alpha.Path)
 
- 			wait.addShare(fileShareID)
 
- 			handler(*fss)
 
- 			continue
 
- 		} else {
 
- 			req := CreateFileShareRequest{
 
- 				HostPath: path,
 
- 				Labels: map[string]string{
 
- 					"com.docker.compose.project": m.projectName,
 
- 				},
 
- 			}
 
- 			createResp, err := m.cli.CreateFileShare(ctx, req)
 
- 			if err != nil {
 
- 				return fmt.Errorf("creating file share: %w", err)
 
- 			}
 
- 			fileShareID = createResp.FileShareID
 
- 			fss = m.state[fileShareID]
 
- 			logrus.Debugf("Created file share %s for path %q", fileShareID, path)
 
- 		}
 
- 		wait.addShare(fileShareID)
 
- 		if fss != nil {
 
- 			handler(*fss)
 
- 		}
 
- 	}
 
- 	return nil
 
- }
 
- func (m *FileShareManager) watch(ctx context.Context, handler func(FileShareSession)) error {
 
- 	events, err := m.cli.StreamFileShares(ctx)
 
- 	if err != nil {
 
- 		return fmt.Errorf("streaming file shares: %w", err)
 
- 	}
 
- 	for {
 
- 		select {
 
- 		case <-ctx.Done():
 
- 			return nil
 
- 		case event := <-events:
 
- 			if event.Error != nil {
 
- 				return fmt.Errorf("reading file share events: %w", event.Error)
 
- 			}
 
- 			// closure for lock
 
- 			func() {
 
- 				m.mu.Lock()
 
- 				defer m.mu.Unlock()
 
- 				for _, fss := range event.Value {
 
- 					handler(fss)
 
- 				}
 
- 			}()
 
- 		}
 
- 	}
 
- }
 
- // eventHandler updates internal state, keeps track of in-progress syncs, and
 
- // prints relevant events to progress.
 
- func (m *FileShareManager) eventHandler(w progress.Writer, wait *waiter) func(fss FileShareSession) {
 
- 	return func(fss FileShareSession) {
 
- 		fileShareID := fss.Beta.Path
 
- 		shouldPrint := wait.isWatching(fileShareID)
 
- 		forProject := fss.Labels[api.ProjectLabel] == m.projectName
 
- 		if shouldPrint || forProject {
 
- 			m.state[fileShareID] = &fss
 
- 		}
 
- 		var percent int
 
- 		var current, total int64
 
- 		if fss.Beta.StagingProgress != nil {
 
- 			current = int64(fss.Beta.StagingProgress.TotalReceivedSize)
 
- 		} else {
 
- 			current = int64(fss.Beta.TotalFileSize)
 
- 		}
 
- 		total = int64(fss.Alpha.TotalFileSize)
 
- 		if total != 0 {
 
- 			percent = int(current * 100 / total)
 
- 		}
 
- 		var status progress.EventStatus
 
- 		var text string
 
- 		switch {
 
- 		case strings.HasPrefix(fss.Status, "halted"):
 
- 			wait.shareDone(fileShareID)
 
- 			status = progress.Error
 
- 		case fss.Status == "watching":
 
- 			wait.shareDone(fileShareID)
 
- 			status = progress.Done
 
- 			percent = 100
 
- 		case fss.Status == "staging-beta":
 
- 			status = progress.Working
 
- 			// TODO(milas): the printer doesn't style statuses for children nicely
 
- 			text = fmt.Sprintf("    Syncing (%7s / %-7s)",
 
- 				units.HumanSize(float64(current)),
 
- 				units.HumanSize(float64(total)),
 
- 			)
 
- 		default:
 
- 			// catch-all for various other transitional statuses
 
- 			status = progress.Working
 
- 		}
 
- 		evt := progress.Event{
 
- 			ID:       fss.Alpha.Path,
 
- 			Status:   status,
 
- 			Text:     text,
 
- 			ParentID: fileShareProgressID,
 
- 			Current:  current,
 
- 			Total:    total,
 
- 			Percent:  percent,
 
- 		}
 
- 		if shouldPrint {
 
- 			w.Event(evt)
 
- 		}
 
- 	}
 
- }
 
- func findExistingShare(path string, existing []FileShareSession) *FileShareSession {
 
- 	for _, share := range existing {
 
- 		if paths.IsChild(share.Alpha.Path, path) {
 
- 			return &share
 
- 		}
 
- 	}
 
- 	return nil
 
- }
 
- type waiter struct {
 
- 	mu       sync.Mutex
 
- 	shareIDs map[string]struct{}
 
- 	done     chan struct{}
 
- }
 
- func (w *waiter) addShare(fileShareID string) {
 
- 	w.mu.Lock()
 
- 	defer w.mu.Unlock()
 
- 	w.shareIDs[fileShareID] = struct{}{}
 
- }
 
- func (w *waiter) isWatching(fileShareID string) bool {
 
- 	w.mu.Lock()
 
- 	defer w.mu.Unlock()
 
- 	_, ok := w.shareIDs[fileShareID]
 
- 	return ok
 
- }
 
- // start returns a channel to wait for any outstanding shares to be ready.
 
- //
 
- // If no shares are registered when this is called, nil is returned.
 
- func (w *waiter) start() <-chan struct{} {
 
- 	w.mu.Lock()
 
- 	defer w.mu.Unlock()
 
- 	if len(w.shareIDs) == 0 {
 
- 		return nil
 
- 	}
 
- 	if w.done == nil {
 
- 		w.done = make(chan struct{})
 
- 	}
 
- 	return w.done
 
- }
 
- func (w *waiter) shareDone(fileShareID string) {
 
- 	w.mu.Lock()
 
- 	defer w.mu.Unlock()
 
- 	delete(w.shareIDs, fileShareID)
 
- 	if len(w.shareIDs) == 0 && w.done != nil {
 
- 		close(w.done)
 
- 		w.done = nil
 
- 	}
 
- }
 
 
  |