| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480 | 
							- /*
 
-    Copyright 2020 Docker Compose CLI authors
 
-    Licensed under the Apache License, Version 2.0 (the "License");
 
-    you may not use this file except in compliance with the License.
 
-    You may obtain a copy of the License at
 
-        http://www.apache.org/licenses/LICENSE-2.0
 
-    Unless required by applicable law or agreed to in writing, software
 
-    distributed under the License is distributed on an "AS IS" BASIS,
 
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
-    See the License for the specific language governing permissions and
 
-    limitations under the License.
 
- */
 
- package compose
 
- import (
 
- 	"context"
 
- 	"fmt"
 
- 	"strings"
 
- 	"sync"
 
- 	"github.com/compose-spec/compose-go/types"
 
- 	"github.com/docker/compose/v2/pkg/api"
 
- 	"golang.org/x/sync/errgroup"
 
- 	"github.com/docker/compose/v2/pkg/utils"
 
- )
 
- // ServiceStatus indicates the status of a service
 
- type ServiceStatus int
 
- // Services status flags
 
- const (
 
- 	ServiceStopped ServiceStatus = iota
 
- 	ServiceStarted
 
- )
 
- type graphTraversal struct {
 
- 	mu      sync.Mutex
 
- 	seen    map[string]struct{}
 
- 	ignored map[string]struct{}
 
- 	extremityNodesFn            func(*Graph) []*Vertex                        // leaves or roots
 
- 	adjacentNodesFn             func(*Vertex) []*Vertex                       // getParents or getChildren
 
- 	filterAdjacentByStatusFn    func(*Graph, string, ServiceStatus) []*Vertex // filterChildren or filterParents
 
- 	targetServiceStatus         ServiceStatus
 
- 	adjacentServiceStatusToSkip ServiceStatus
 
- 	visitorFn      func(context.Context, string) error
 
- 	maxConcurrency int
 
- }
 
- func upDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
 
- 	return &graphTraversal{
 
- 		extremityNodesFn:            leaves,
 
- 		adjacentNodesFn:             getParents,
 
- 		filterAdjacentByStatusFn:    filterChildren,
 
- 		adjacentServiceStatusToSkip: ServiceStopped,
 
- 		targetServiceStatus:         ServiceStarted,
 
- 		visitorFn:                   visitorFn,
 
- 	}
 
- }
 
- func downDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
 
- 	return &graphTraversal{
 
- 		extremityNodesFn:            roots,
 
- 		adjacentNodesFn:             getChildren,
 
- 		filterAdjacentByStatusFn:    filterParents,
 
- 		adjacentServiceStatusToSkip: ServiceStarted,
 
- 		targetServiceStatus:         ServiceStopped,
 
- 		visitorFn:                   visitorFn,
 
- 	}
 
- }
 
- // InDependencyOrder applies the function to the services of the project taking in account the dependency order
 
- func InDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
 
- 	graph, err := NewGraph(project, ServiceStopped)
 
- 	if err != nil {
 
- 		return err
 
- 	}
 
- 	t := upDirectionTraversal(fn)
 
- 	for _, option := range options {
 
- 		option(t)
 
- 	}
 
- 	return t.visit(ctx, graph)
 
- }
 
- // InReverseDependencyOrder applies the function to the services of the project in reverse order of dependencies
 
- func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
 
- 	graph, err := NewGraph(project, ServiceStarted)
 
- 	if err != nil {
 
- 		return err
 
- 	}
 
- 	t := downDirectionTraversal(fn)
 
- 	for _, option := range options {
 
- 		option(t)
 
- 	}
 
- 	return t.visit(ctx, graph)
 
- }
 
- func WithRootNodesAndDown(nodes []string) func(*graphTraversal) {
 
- 	return func(t *graphTraversal) {
 
- 		if len(nodes) == 0 {
 
- 			return
 
- 		}
 
- 		originalFn := t.extremityNodesFn
 
- 		t.extremityNodesFn = func(graph *Graph) []*Vertex {
 
- 			var want []string
 
- 			for _, node := range nodes {
 
- 				vertex := graph.Vertices[node]
 
- 				want = append(want, vertex.Service)
 
- 				for _, v := range getAncestors(vertex) {
 
- 					want = append(want, v.Service)
 
- 				}
 
- 			}
 
- 			t.ignored = map[string]struct{}{}
 
- 			for k := range graph.Vertices {
 
- 				if !utils.Contains(want, k) {
 
- 					t.ignored[k] = struct{}{}
 
- 				}
 
- 			}
 
- 			return originalFn(graph)
 
- 		}
 
- 	}
 
- }
 
- func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
 
- 	expect := len(g.Vertices)
 
- 	if expect == 0 {
 
- 		return nil
 
- 	}
 
- 	eg, ctx := errgroup.WithContext(ctx)
 
- 	if t.maxConcurrency > 0 {
 
- 		eg.SetLimit(t.maxConcurrency + 1)
 
- 	}
 
- 	nodeCh := make(chan *Vertex, expect)
 
- 	defer close(nodeCh)
 
- 	// nodeCh need to allow n=expect writers while reader goroutine could have returner after ctx.Done
 
- 	eg.Go(func() error {
 
- 		for {
 
- 			select {
 
- 			case <-ctx.Done():
 
- 				return nil
 
- 			case node := <-nodeCh:
 
- 				expect--
 
- 				if expect == 0 {
 
- 					return nil
 
- 				}
 
- 				t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh)
 
- 			}
 
- 		}
 
- 	})
 
- 	nodes := t.extremityNodesFn(g)
 
- 	t.run(ctx, g, eg, nodes, nodeCh)
 
- 	return eg.Wait()
 
- }
 
- // Note: this could be `graph.walk` or whatever
 
- func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, nodeCh chan *Vertex) {
 
- 	for _, node := range nodes {
 
- 		// Don't start this service yet if all of its children have
 
- 		// not been started yet.
 
- 		if len(t.filterAdjacentByStatusFn(graph, node.Key, t.adjacentServiceStatusToSkip)) != 0 {
 
- 			continue
 
- 		}
 
- 		node := node
 
- 		if !t.consume(node.Key) {
 
- 			// another worker already visited this node
 
- 			continue
 
- 		}
 
- 		eg.Go(func() error {
 
- 			var err error
 
- 			if _, ignore := t.ignored[node.Service]; !ignore {
 
- 				err = t.visitorFn(ctx, node.Service)
 
- 			}
 
- 			if err == nil {
 
- 				graph.UpdateStatus(node.Key, t.targetServiceStatus)
 
- 			}
 
- 			nodeCh <- node
 
- 			return err
 
- 		})
 
- 	}
 
- }
 
- func (t *graphTraversal) consume(nodeKey string) bool {
 
- 	t.mu.Lock()
 
- 	defer t.mu.Unlock()
 
- 	if t.seen == nil {
 
- 		t.seen = make(map[string]struct{})
 
- 	}
 
- 	if _, ok := t.seen[nodeKey]; ok {
 
- 		return false
 
- 	}
 
- 	t.seen[nodeKey] = struct{}{}
 
- 	return true
 
- }
 
- // Graph represents project as service dependencies
 
- type Graph struct {
 
- 	Vertices map[string]*Vertex
 
- 	lock     sync.RWMutex
 
- }
 
- // Vertex represents a service in the dependencies structure
 
- type Vertex struct {
 
- 	Key      string
 
- 	Service  string
 
- 	Status   ServiceStatus
 
- 	Children map[string]*Vertex
 
- 	Parents  map[string]*Vertex
 
- }
 
- func getParents(v *Vertex) []*Vertex {
 
- 	return v.GetParents()
 
- }
 
- // GetParents returns a slice with the parent vertices of the a Vertex
 
- func (v *Vertex) GetParents() []*Vertex {
 
- 	var res []*Vertex
 
- 	for _, p := range v.Parents {
 
- 		res = append(res, p)
 
- 	}
 
- 	return res
 
- }
 
- func getChildren(v *Vertex) []*Vertex {
 
- 	return v.GetChildren()
 
- }
 
- // getAncestors return all descendents for a vertex, might contain duplicates
 
- func getAncestors(v *Vertex) []*Vertex {
 
- 	var descendents []*Vertex
 
- 	for _, parent := range v.GetParents() {
 
- 		descendents = append(descendents, parent)
 
- 		descendents = append(descendents, getAncestors(parent)...)
 
- 	}
 
- 	return descendents
 
- }
 
- // GetChildren returns a slice with the child vertices of the a Vertex
 
- func (v *Vertex) GetChildren() []*Vertex {
 
- 	var res []*Vertex
 
- 	for _, p := range v.Children {
 
- 		res = append(res, p)
 
- 	}
 
- 	return res
 
- }
 
- // NewGraph returns the dependency graph of the services
 
- func NewGraph(project *types.Project, initialStatus ServiceStatus) (*Graph, error) {
 
- 	graph := &Graph{
 
- 		lock:     sync.RWMutex{},
 
- 		Vertices: map[string]*Vertex{},
 
- 	}
 
- 	for _, s := range project.Services {
 
- 		graph.AddVertex(s.Name, s.Name, initialStatus)
 
- 	}
 
- 	for index, s := range project.Services {
 
- 		for _, name := range s.GetDependencies() {
 
- 			err := graph.AddEdge(s.Name, name)
 
- 			if err != nil {
 
- 				if !s.DependsOn[name].Required {
 
- 					delete(s.DependsOn, name)
 
- 					project.Services[index] = s
 
- 					continue
 
- 				}
 
- 				if api.IsNotFoundError(err) {
 
- 					ds, err := project.GetDisabledService(name)
 
- 					if err == nil {
 
- 						return nil, fmt.Errorf("service %s is required by %s but is disabled. Can be enabled by profiles %s", name, s.Name, ds.Profiles)
 
- 					}
 
- 				}
 
- 				return nil, err
 
- 			}
 
- 		}
 
- 	}
 
- 	if b, err := graph.HasCycles(); b {
 
- 		return nil, err
 
- 	}
 
- 	return graph, nil
 
- }
 
- // NewVertex is the constructor function for the Vertex
 
- func NewVertex(key string, service string, initialStatus ServiceStatus) *Vertex {
 
- 	return &Vertex{
 
- 		Key:      key,
 
- 		Service:  service,
 
- 		Status:   initialStatus,
 
- 		Parents:  map[string]*Vertex{},
 
- 		Children: map[string]*Vertex{},
 
- 	}
 
- }
 
- // AddVertex adds a vertex to the Graph
 
- func (g *Graph) AddVertex(key string, service string, initialStatus ServiceStatus) {
 
- 	g.lock.Lock()
 
- 	defer g.lock.Unlock()
 
- 	v := NewVertex(key, service, initialStatus)
 
- 	g.Vertices[key] = v
 
- }
 
- // AddEdge adds a relationship of dependency between vertices `source` and `destination`
 
- func (g *Graph) AddEdge(source string, destination string) error {
 
- 	g.lock.Lock()
 
- 	defer g.lock.Unlock()
 
- 	sourceVertex := g.Vertices[source]
 
- 	destinationVertex := g.Vertices[destination]
 
- 	if sourceVertex == nil {
 
- 		return fmt.Errorf("could not find %s: %w", source, api.ErrNotFound)
 
- 	}
 
- 	if destinationVertex == nil {
 
- 		return fmt.Errorf("could not find %s: %w", destination, api.ErrNotFound)
 
- 	}
 
- 	// If they are already connected
 
- 	if _, ok := sourceVertex.Children[destination]; ok {
 
- 		return nil
 
- 	}
 
- 	sourceVertex.Children[destination] = destinationVertex
 
- 	destinationVertex.Parents[source] = sourceVertex
 
- 	return nil
 
- }
 
- func leaves(g *Graph) []*Vertex {
 
- 	return g.Leaves()
 
- }
 
- // Leaves returns the slice of leaves of the graph
 
- func (g *Graph) Leaves() []*Vertex {
 
- 	g.lock.Lock()
 
- 	defer g.lock.Unlock()
 
- 	var res []*Vertex
 
- 	for _, v := range g.Vertices {
 
- 		if len(v.Children) == 0 {
 
- 			res = append(res, v)
 
- 		}
 
- 	}
 
- 	return res
 
- }
 
- func roots(g *Graph) []*Vertex {
 
- 	return g.Roots()
 
- }
 
- // Roots returns the slice of "Roots" of the graph
 
- func (g *Graph) Roots() []*Vertex {
 
- 	g.lock.Lock()
 
- 	defer g.lock.Unlock()
 
- 	var res []*Vertex
 
- 	for _, v := range g.Vertices {
 
- 		if len(v.Parents) == 0 {
 
- 			res = append(res, v)
 
- 		}
 
- 	}
 
- 	return res
 
- }
 
- // UpdateStatus updates the status of a certain vertex
 
- func (g *Graph) UpdateStatus(key string, status ServiceStatus) {
 
- 	g.lock.Lock()
 
- 	defer g.lock.Unlock()
 
- 	g.Vertices[key].Status = status
 
- }
 
- func filterChildren(g *Graph, k string, s ServiceStatus) []*Vertex {
 
- 	return g.FilterChildren(k, s)
 
- }
 
- // FilterChildren returns children of a certain vertex that are in a certain status
 
- func (g *Graph) FilterChildren(key string, status ServiceStatus) []*Vertex {
 
- 	g.lock.Lock()
 
- 	defer g.lock.Unlock()
 
- 	var res []*Vertex
 
- 	vertex := g.Vertices[key]
 
- 	for _, child := range vertex.Children {
 
- 		if child.Status == status {
 
- 			res = append(res, child)
 
- 		}
 
- 	}
 
- 	return res
 
- }
 
- func filterParents(g *Graph, k string, s ServiceStatus) []*Vertex {
 
- 	return g.FilterParents(k, s)
 
- }
 
- // FilterParents returns the parents of a certain vertex that are in a certain status
 
- func (g *Graph) FilterParents(key string, status ServiceStatus) []*Vertex {
 
- 	g.lock.Lock()
 
- 	defer g.lock.Unlock()
 
- 	var res []*Vertex
 
- 	vertex := g.Vertices[key]
 
- 	for _, parent := range vertex.Parents {
 
- 		if parent.Status == status {
 
- 			res = append(res, parent)
 
- 		}
 
- 	}
 
- 	return res
 
- }
 
- // HasCycles detects cycles in the graph
 
- func (g *Graph) HasCycles() (bool, error) {
 
- 	discovered := []string{}
 
- 	finished := []string{}
 
- 	for _, vertex := range g.Vertices {
 
- 		path := []string{
 
- 			vertex.Key,
 
- 		}
 
- 		if !utils.StringContains(discovered, vertex.Key) && !utils.StringContains(finished, vertex.Key) {
 
- 			var err error
 
- 			discovered, finished, err = g.visit(vertex.Key, path, discovered, finished)
 
- 			if err != nil {
 
- 				return true, err
 
- 			}
 
- 		}
 
- 	}
 
- 	return false, nil
 
- }
 
- func (g *Graph) visit(key string, path []string, discovered []string, finished []string) ([]string, []string, error) {
 
- 	discovered = append(discovered, key)
 
- 	for _, v := range g.Vertices[key].Children {
 
- 		path := append(path, v.Key)
 
- 		if utils.StringContains(discovered, v.Key) {
 
- 			return nil, nil, fmt.Errorf("cycle found: %s", strings.Join(path, " -> "))
 
- 		}
 
- 		if !utils.StringContains(finished, v.Key) {
 
- 			if _, _, err := g.visit(v.Key, path, discovered, finished); err != nil {
 
- 				return nil, nil, err
 
- 			}
 
- 		}
 
- 	}
 
- 	discovered = remove(discovered, key)
 
- 	finished = append(finished, key)
 
- 	return discovered, finished, nil
 
- }
 
- func remove(slice []string, item string) []string {
 
- 	var s []string
 
- 	for _, i := range slice {
 
- 		if i != item {
 
- 			s = append(s, i)
 
- 		}
 
- 	}
 
- 	return s
 
- }
 
 
  |