1
0

dependencies.go 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "github.com/compose-spec/compose-go/types"
  20. "golang.org/x/sync/errgroup"
  21. "github.com/docker/compose/v2/pkg/utils"
  22. )
  23. // ServiceStatus indicates the status of a service
  24. type ServiceStatus int
  25. // Services status flags
  26. const (
  27. ServiceStopped ServiceStatus = iota
  28. ServiceStarted
  29. )
  30. type graphTraversal struct {
  31. mu sync.Mutex
  32. seen map[string]struct{}
  33. extremityNodesFn func(*Graph) []*Vertex // leaves or roots
  34. adjacentNodesFn func(*Vertex) []*Vertex // getParents or getChildren
  35. filterAdjacentByStatusFn func(*Graph, string, ServiceStatus) []*Vertex // filterChildren or filterParents
  36. targetServiceStatus ServiceStatus
  37. adjacentServiceStatusToSkip ServiceStatus
  38. visitorFn func(context.Context, string) error
  39. maxConcurrency int
  40. }
  41. func upDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  42. return &graphTraversal{
  43. extremityNodesFn: leaves,
  44. adjacentNodesFn: getParents,
  45. filterAdjacentByStatusFn: filterChildren,
  46. adjacentServiceStatusToSkip: ServiceStopped,
  47. targetServiceStatus: ServiceStarted,
  48. visitorFn: visitorFn,
  49. }
  50. }
  51. func downDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  52. return &graphTraversal{
  53. extremityNodesFn: roots,
  54. adjacentNodesFn: getChildren,
  55. filterAdjacentByStatusFn: filterParents,
  56. adjacentServiceStatusToSkip: ServiceStarted,
  57. targetServiceStatus: ServiceStopped,
  58. visitorFn: visitorFn,
  59. }
  60. }
  61. // InDependencyOrder applies the function to the services of the project taking in account the dependency order
  62. func InDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
  63. graph, err := NewGraph(project.Services, ServiceStopped)
  64. if err != nil {
  65. return err
  66. }
  67. t := upDirectionTraversal(fn)
  68. for _, option := range options {
  69. option(t)
  70. }
  71. return t.visit(ctx, graph)
  72. }
  73. // InReverseDependencyOrder applies the function to the services of the project in reverse order of dependencies
  74. func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error) error {
  75. graph, err := NewGraph(project.Services, ServiceStarted)
  76. if err != nil {
  77. return err
  78. }
  79. t := downDirectionTraversal(fn)
  80. return t.visit(ctx, graph)
  81. }
  82. func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
  83. expect := len(g.Vertices)
  84. if expect == 0 {
  85. return nil
  86. }
  87. eg, ctx := errgroup.WithContext(ctx)
  88. if t.maxConcurrency > 0 {
  89. eg.SetLimit(t.maxConcurrency + 1)
  90. }
  91. nodeCh := make(chan *Vertex)
  92. eg.Go(func() error {
  93. for node := range nodeCh {
  94. expect--
  95. if expect == 0 {
  96. close(nodeCh)
  97. return nil
  98. }
  99. t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh)
  100. }
  101. return nil
  102. })
  103. nodes := t.extremityNodesFn(g)
  104. t.run(ctx, g, eg, nodes, nodeCh)
  105. err := eg.Wait()
  106. return err
  107. }
  108. // Note: this could be `graph.walk` or whatever
  109. func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, nodeCh chan *Vertex) {
  110. for _, node := range nodes {
  111. // Don't start this service yet if all of its children have
  112. // not been started yet.
  113. if len(t.filterAdjacentByStatusFn(graph, node.Key, t.adjacentServiceStatusToSkip)) != 0 {
  114. continue
  115. }
  116. node := node
  117. if !t.consume(node.Key) {
  118. // another worker already visited this node
  119. continue
  120. }
  121. eg.Go(func() error {
  122. err := t.visitorFn(ctx, node.Service)
  123. if err == nil {
  124. graph.UpdateStatus(node.Key, t.targetServiceStatus)
  125. }
  126. nodeCh <- node
  127. return err
  128. })
  129. }
  130. }
  131. func (t *graphTraversal) consume(nodeKey string) bool {
  132. t.mu.Lock()
  133. defer t.mu.Unlock()
  134. if t.seen == nil {
  135. t.seen = make(map[string]struct{})
  136. }
  137. if _, ok := t.seen[nodeKey]; ok {
  138. return false
  139. }
  140. t.seen[nodeKey] = struct{}{}
  141. return true
  142. }
  143. // Graph represents project as service dependencies
  144. type Graph struct {
  145. Vertices map[string]*Vertex
  146. lock sync.RWMutex
  147. }
  148. // Vertex represents a service in the dependencies structure
  149. type Vertex struct {
  150. Key string
  151. Service string
  152. Status ServiceStatus
  153. Children map[string]*Vertex
  154. Parents map[string]*Vertex
  155. }
  156. func getParents(v *Vertex) []*Vertex {
  157. return v.GetParents()
  158. }
  159. // GetParents returns a slice with the parent vertices of the a Vertex
  160. func (v *Vertex) GetParents() []*Vertex {
  161. var res []*Vertex
  162. for _, p := range v.Parents {
  163. res = append(res, p)
  164. }
  165. return res
  166. }
  167. func getChildren(v *Vertex) []*Vertex {
  168. return v.GetChildren()
  169. }
  170. // GetChildren returns a slice with the child vertices of the a Vertex
  171. func (v *Vertex) GetChildren() []*Vertex {
  172. var res []*Vertex
  173. for _, p := range v.Children {
  174. res = append(res, p)
  175. }
  176. return res
  177. }
  178. // NewGraph returns the dependency graph of the services
  179. func NewGraph(services types.Services, initialStatus ServiceStatus) (*Graph, error) {
  180. graph := &Graph{
  181. lock: sync.RWMutex{},
  182. Vertices: map[string]*Vertex{},
  183. }
  184. for _, s := range services {
  185. graph.AddVertex(s.Name, s.Name, initialStatus)
  186. }
  187. for _, s := range services {
  188. for _, name := range s.GetDependencies() {
  189. _ = graph.AddEdge(s.Name, name)
  190. }
  191. }
  192. if b, err := graph.HasCycles(); b {
  193. return nil, err
  194. }
  195. return graph, nil
  196. }
  197. // NewVertex is the constructor function for the Vertex
  198. func NewVertex(key string, service string, initialStatus ServiceStatus) *Vertex {
  199. return &Vertex{
  200. Key: key,
  201. Service: service,
  202. Status: initialStatus,
  203. Parents: map[string]*Vertex{},
  204. Children: map[string]*Vertex{},
  205. }
  206. }
  207. // AddVertex adds a vertex to the Graph
  208. func (g *Graph) AddVertex(key string, service string, initialStatus ServiceStatus) {
  209. g.lock.Lock()
  210. defer g.lock.Unlock()
  211. v := NewVertex(key, service, initialStatus)
  212. g.Vertices[key] = v
  213. }
  214. // AddEdge adds a relationship of dependency between vertices `source` and `destination`
  215. func (g *Graph) AddEdge(source string, destination string) error {
  216. g.lock.Lock()
  217. defer g.lock.Unlock()
  218. sourceVertex := g.Vertices[source]
  219. destinationVertex := g.Vertices[destination]
  220. if sourceVertex == nil {
  221. return fmt.Errorf("could not find %s", source)
  222. }
  223. if destinationVertex == nil {
  224. return fmt.Errorf("could not find %s", destination)
  225. }
  226. // If they are already connected
  227. if _, ok := sourceVertex.Children[destination]; ok {
  228. return nil
  229. }
  230. sourceVertex.Children[destination] = destinationVertex
  231. destinationVertex.Parents[source] = sourceVertex
  232. return nil
  233. }
  234. func leaves(g *Graph) []*Vertex {
  235. return g.Leaves()
  236. }
  237. // Leaves returns the slice of leaves of the graph
  238. func (g *Graph) Leaves() []*Vertex {
  239. g.lock.Lock()
  240. defer g.lock.Unlock()
  241. var res []*Vertex
  242. for _, v := range g.Vertices {
  243. if len(v.Children) == 0 {
  244. res = append(res, v)
  245. }
  246. }
  247. return res
  248. }
  249. func roots(g *Graph) []*Vertex {
  250. return g.Roots()
  251. }
  252. // Roots returns the slice of "Roots" of the graph
  253. func (g *Graph) Roots() []*Vertex {
  254. g.lock.Lock()
  255. defer g.lock.Unlock()
  256. var res []*Vertex
  257. for _, v := range g.Vertices {
  258. if len(v.Parents) == 0 {
  259. res = append(res, v)
  260. }
  261. }
  262. return res
  263. }
  264. // UpdateStatus updates the status of a certain vertex
  265. func (g *Graph) UpdateStatus(key string, status ServiceStatus) {
  266. g.lock.Lock()
  267. defer g.lock.Unlock()
  268. g.Vertices[key].Status = status
  269. }
  270. func filterChildren(g *Graph, k string, s ServiceStatus) []*Vertex {
  271. return g.FilterChildren(k, s)
  272. }
  273. // FilterChildren returns children of a certain vertex that are in a certain status
  274. func (g *Graph) FilterChildren(key string, status ServiceStatus) []*Vertex {
  275. g.lock.Lock()
  276. defer g.lock.Unlock()
  277. var res []*Vertex
  278. vertex := g.Vertices[key]
  279. for _, child := range vertex.Children {
  280. if child.Status == status {
  281. res = append(res, child)
  282. }
  283. }
  284. return res
  285. }
  286. func filterParents(g *Graph, k string, s ServiceStatus) []*Vertex {
  287. return g.FilterParents(k, s)
  288. }
  289. // FilterParents returns the parents of a certain vertex that are in a certain status
  290. func (g *Graph) FilterParents(key string, status ServiceStatus) []*Vertex {
  291. g.lock.Lock()
  292. defer g.lock.Unlock()
  293. var res []*Vertex
  294. vertex := g.Vertices[key]
  295. for _, parent := range vertex.Parents {
  296. if parent.Status == status {
  297. res = append(res, parent)
  298. }
  299. }
  300. return res
  301. }
  302. // HasCycles detects cycles in the graph
  303. func (g *Graph) HasCycles() (bool, error) {
  304. discovered := []string{}
  305. finished := []string{}
  306. for _, vertex := range g.Vertices {
  307. path := []string{
  308. vertex.Key,
  309. }
  310. if !utils.StringContains(discovered, vertex.Key) && !utils.StringContains(finished, vertex.Key) {
  311. var err error
  312. discovered, finished, err = g.visit(vertex.Key, path, discovered, finished)
  313. if err != nil {
  314. return true, err
  315. }
  316. }
  317. }
  318. return false, nil
  319. }
  320. func (g *Graph) visit(key string, path []string, discovered []string, finished []string) ([]string, []string, error) {
  321. discovered = append(discovered, key)
  322. for _, v := range g.Vertices[key].Children {
  323. path := append(path, v.Key)
  324. if utils.StringContains(discovered, v.Key) {
  325. return nil, nil, fmt.Errorf("cycle found: %s", strings.Join(path, " -> "))
  326. }
  327. if !utils.StringContains(finished, v.Key) {
  328. if _, _, err := g.visit(v.Key, path, discovered, finished); err != nil {
  329. return nil, nil, err
  330. }
  331. }
  332. }
  333. discovered = remove(discovered, key)
  334. finished = append(finished, key)
  335. return discovered, finished, nil
  336. }
  337. func remove(slice []string, item string) []string {
  338. var s []string
  339. for _, i := range slice {
  340. if i != item {
  341. s = append(s, i)
  342. }
  343. }
  344. return s
  345. }