dependencies.go 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "github.com/compose-spec/compose-go/types"
  20. "golang.org/x/sync/errgroup"
  21. "github.com/docker/compose/v2/pkg/utils"
  22. )
  23. // ServiceStatus indicates the status of a service
  24. type ServiceStatus int
  25. // Services status flags
  26. const (
  27. ServiceStopped ServiceStatus = iota
  28. ServiceStarted
  29. )
  30. type graphTraversal struct {
  31. mu sync.Mutex
  32. seen map[string]struct{}
  33. extremityNodesFn func(*Graph) []*Vertex // leaves or roots
  34. adjacentNodesFn func(*Vertex) []*Vertex // getParents or getChildren
  35. filterAdjacentByStatusFn func(*Graph, string, ServiceStatus) []*Vertex // filterChildren or filterParents
  36. targetServiceStatus ServiceStatus
  37. adjacentServiceStatusToSkip ServiceStatus
  38. visitorFn func(context.Context, string) error
  39. maxConcurrency int
  40. }
  41. func upDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  42. return &graphTraversal{
  43. extremityNodesFn: leaves,
  44. adjacentNodesFn: getParents,
  45. filterAdjacentByStatusFn: filterChildren,
  46. adjacentServiceStatusToSkip: ServiceStopped,
  47. targetServiceStatus: ServiceStarted,
  48. visitorFn: visitorFn,
  49. }
  50. }
  51. func downDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  52. return &graphTraversal{
  53. extremityNodesFn: roots,
  54. adjacentNodesFn: getChildren,
  55. filterAdjacentByStatusFn: filterParents,
  56. adjacentServiceStatusToSkip: ServiceStarted,
  57. targetServiceStatus: ServiceStopped,
  58. visitorFn: visitorFn,
  59. }
  60. }
  61. // InDependencyOrder applies the function to the services of the project taking in account the dependency order
  62. func InDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
  63. graph, err := NewGraph(project.Services, ServiceStopped)
  64. if err != nil {
  65. return err
  66. }
  67. t := upDirectionTraversal(fn)
  68. for _, option := range options {
  69. option(t)
  70. }
  71. return t.visit(ctx, graph)
  72. }
  73. // InReverseDependencyOrder applies the function to the services of the project in reverse order of dependencies
  74. func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error) error {
  75. graph, err := NewGraph(project.Services, ServiceStarted)
  76. if err != nil {
  77. return err
  78. }
  79. t := downDirectionTraversal(fn)
  80. return t.visit(ctx, graph)
  81. }
  82. func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
  83. nodes := t.extremityNodesFn(g)
  84. eg, ctx := errgroup.WithContext(ctx)
  85. if t.maxConcurrency > 0 {
  86. eg.SetLimit(t.maxConcurrency)
  87. }
  88. t.run(ctx, g, eg, nodes)
  89. return eg.Wait()
  90. }
  91. // Note: this could be `graph.walk` or whatever
  92. func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex) {
  93. for _, node := range nodes {
  94. // Don't start this service yet if all of its children have
  95. // not been started yet.
  96. if len(t.filterAdjacentByStatusFn(graph, node.Key, t.adjacentServiceStatusToSkip)) != 0 {
  97. continue
  98. }
  99. node := node
  100. if !t.consume(node.Key) {
  101. // another worker already visited this node
  102. continue
  103. }
  104. eg.Go(func() error {
  105. err := t.visitorFn(ctx, node.Service)
  106. if err != nil {
  107. return err
  108. }
  109. graph.UpdateStatus(node.Key, t.targetServiceStatus)
  110. t.run(ctx, graph, eg, t.adjacentNodesFn(node))
  111. return nil
  112. })
  113. }
  114. }
  115. func (t *graphTraversal) consume(nodeKey string) bool {
  116. t.mu.Lock()
  117. defer t.mu.Unlock()
  118. if t.seen == nil {
  119. t.seen = make(map[string]struct{})
  120. }
  121. if _, ok := t.seen[nodeKey]; ok {
  122. return false
  123. }
  124. t.seen[nodeKey] = struct{}{}
  125. return true
  126. }
  127. // Graph represents project as service dependencies
  128. type Graph struct {
  129. Vertices map[string]*Vertex
  130. lock sync.RWMutex
  131. }
  132. // Vertex represents a service in the dependencies structure
  133. type Vertex struct {
  134. Key string
  135. Service string
  136. Status ServiceStatus
  137. Children map[string]*Vertex
  138. Parents map[string]*Vertex
  139. }
  140. func getParents(v *Vertex) []*Vertex {
  141. return v.GetParents()
  142. }
  143. // GetParents returns a slice with the parent vertices of the a Vertex
  144. func (v *Vertex) GetParents() []*Vertex {
  145. var res []*Vertex
  146. for _, p := range v.Parents {
  147. res = append(res, p)
  148. }
  149. return res
  150. }
  151. func getChildren(v *Vertex) []*Vertex {
  152. return v.GetChildren()
  153. }
  154. // GetChildren returns a slice with the child vertices of the a Vertex
  155. func (v *Vertex) GetChildren() []*Vertex {
  156. var res []*Vertex
  157. for _, p := range v.Children {
  158. res = append(res, p)
  159. }
  160. return res
  161. }
  162. // NewGraph returns the dependency graph of the services
  163. func NewGraph(services types.Services, initialStatus ServiceStatus) (*Graph, error) {
  164. graph := &Graph{
  165. lock: sync.RWMutex{},
  166. Vertices: map[string]*Vertex{},
  167. }
  168. for _, s := range services {
  169. graph.AddVertex(s.Name, s.Name, initialStatus)
  170. }
  171. for _, s := range services {
  172. for _, name := range s.GetDependencies() {
  173. _ = graph.AddEdge(s.Name, name)
  174. }
  175. }
  176. if b, err := graph.HasCycles(); b {
  177. return nil, err
  178. }
  179. return graph, nil
  180. }
  181. // NewVertex is the constructor function for the Vertex
  182. func NewVertex(key string, service string, initialStatus ServiceStatus) *Vertex {
  183. return &Vertex{
  184. Key: key,
  185. Service: service,
  186. Status: initialStatus,
  187. Parents: map[string]*Vertex{},
  188. Children: map[string]*Vertex{},
  189. }
  190. }
  191. // AddVertex adds a vertex to the Graph
  192. func (g *Graph) AddVertex(key string, service string, initialStatus ServiceStatus) {
  193. g.lock.Lock()
  194. defer g.lock.Unlock()
  195. v := NewVertex(key, service, initialStatus)
  196. g.Vertices[key] = v
  197. }
  198. // AddEdge adds a relationship of dependency between vertices `source` and `destination`
  199. func (g *Graph) AddEdge(source string, destination string) error {
  200. g.lock.Lock()
  201. defer g.lock.Unlock()
  202. sourceVertex := g.Vertices[source]
  203. destinationVertex := g.Vertices[destination]
  204. if sourceVertex == nil {
  205. return fmt.Errorf("could not find %s", source)
  206. }
  207. if destinationVertex == nil {
  208. return fmt.Errorf("could not find %s", destination)
  209. }
  210. // If they are already connected
  211. if _, ok := sourceVertex.Children[destination]; ok {
  212. return nil
  213. }
  214. sourceVertex.Children[destination] = destinationVertex
  215. destinationVertex.Parents[source] = sourceVertex
  216. return nil
  217. }
  218. func leaves(g *Graph) []*Vertex {
  219. return g.Leaves()
  220. }
  221. // Leaves returns the slice of leaves of the graph
  222. func (g *Graph) Leaves() []*Vertex {
  223. g.lock.Lock()
  224. defer g.lock.Unlock()
  225. var res []*Vertex
  226. for _, v := range g.Vertices {
  227. if len(v.Children) == 0 {
  228. res = append(res, v)
  229. }
  230. }
  231. return res
  232. }
  233. func roots(g *Graph) []*Vertex {
  234. return g.Roots()
  235. }
  236. // Roots returns the slice of "Roots" of the graph
  237. func (g *Graph) Roots() []*Vertex {
  238. g.lock.Lock()
  239. defer g.lock.Unlock()
  240. var res []*Vertex
  241. for _, v := range g.Vertices {
  242. if len(v.Parents) == 0 {
  243. res = append(res, v)
  244. }
  245. }
  246. return res
  247. }
  248. // UpdateStatus updates the status of a certain vertex
  249. func (g *Graph) UpdateStatus(key string, status ServiceStatus) {
  250. g.lock.Lock()
  251. defer g.lock.Unlock()
  252. g.Vertices[key].Status = status
  253. }
  254. func filterChildren(g *Graph, k string, s ServiceStatus) []*Vertex {
  255. return g.FilterChildren(k, s)
  256. }
  257. // FilterChildren returns children of a certain vertex that are in a certain status
  258. func (g *Graph) FilterChildren(key string, status ServiceStatus) []*Vertex {
  259. g.lock.Lock()
  260. defer g.lock.Unlock()
  261. var res []*Vertex
  262. vertex := g.Vertices[key]
  263. for _, child := range vertex.Children {
  264. if child.Status == status {
  265. res = append(res, child)
  266. }
  267. }
  268. return res
  269. }
  270. func filterParents(g *Graph, k string, s ServiceStatus) []*Vertex {
  271. return g.FilterParents(k, s)
  272. }
  273. // FilterParents returns the parents of a certain vertex that are in a certain status
  274. func (g *Graph) FilterParents(key string, status ServiceStatus) []*Vertex {
  275. g.lock.Lock()
  276. defer g.lock.Unlock()
  277. var res []*Vertex
  278. vertex := g.Vertices[key]
  279. for _, parent := range vertex.Parents {
  280. if parent.Status == status {
  281. res = append(res, parent)
  282. }
  283. }
  284. return res
  285. }
  286. // HasCycles detects cycles in the graph
  287. func (g *Graph) HasCycles() (bool, error) {
  288. discovered := []string{}
  289. finished := []string{}
  290. for _, vertex := range g.Vertices {
  291. path := []string{
  292. vertex.Key,
  293. }
  294. if !utils.StringContains(discovered, vertex.Key) && !utils.StringContains(finished, vertex.Key) {
  295. var err error
  296. discovered, finished, err = g.visit(vertex.Key, path, discovered, finished)
  297. if err != nil {
  298. return true, err
  299. }
  300. }
  301. }
  302. return false, nil
  303. }
  304. func (g *Graph) visit(key string, path []string, discovered []string, finished []string) ([]string, []string, error) {
  305. discovered = append(discovered, key)
  306. for _, v := range g.Vertices[key].Children {
  307. path := append(path, v.Key)
  308. if utils.StringContains(discovered, v.Key) {
  309. return nil, nil, fmt.Errorf("cycle found: %s", strings.Join(path, " -> "))
  310. }
  311. if !utils.StringContains(finished, v.Key) {
  312. if _, _, err := g.visit(v.Key, path, discovered, finished); err != nil {
  313. return nil, nil, err
  314. }
  315. }
  316. }
  317. discovered = remove(discovered, key)
  318. finished = append(finished, key)
  319. return discovered, finished, nil
  320. }
  321. func remove(slice []string, item string) []string {
  322. var s []string
  323. for _, i := range slice {
  324. if i != item {
  325. s = append(s, i)
  326. }
  327. }
  328. return s
  329. }