dependencies.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "github.com/compose-spec/compose-go/types"
  20. "github.com/docker/compose/v2/pkg/api"
  21. "golang.org/x/sync/errgroup"
  22. "github.com/docker/compose/v2/pkg/utils"
  23. )
  24. // ServiceStatus indicates the status of a service
  25. type ServiceStatus int
  26. // Services status flags
  27. const (
  28. ServiceStopped ServiceStatus = iota
  29. ServiceStarted
  30. )
  31. type graphTraversal struct {
  32. mu sync.Mutex
  33. seen map[string]struct{}
  34. ignored map[string]struct{}
  35. extremityNodesFn func(*Graph) []*Vertex // leaves or roots
  36. adjacentNodesFn func(*Vertex) []*Vertex // getParents or getChildren
  37. filterAdjacentByStatusFn func(*Graph, string, ServiceStatus) []*Vertex // filterChildren or filterParents
  38. targetServiceStatus ServiceStatus
  39. adjacentServiceStatusToSkip ServiceStatus
  40. visitorFn func(context.Context, string) error
  41. maxConcurrency int
  42. }
  43. func upDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  44. return &graphTraversal{
  45. extremityNodesFn: leaves,
  46. adjacentNodesFn: getParents,
  47. filterAdjacentByStatusFn: filterChildren,
  48. adjacentServiceStatusToSkip: ServiceStopped,
  49. targetServiceStatus: ServiceStarted,
  50. visitorFn: visitorFn,
  51. }
  52. }
  53. func downDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  54. return &graphTraversal{
  55. extremityNodesFn: roots,
  56. adjacentNodesFn: getChildren,
  57. filterAdjacentByStatusFn: filterParents,
  58. adjacentServiceStatusToSkip: ServiceStarted,
  59. targetServiceStatus: ServiceStopped,
  60. visitorFn: visitorFn,
  61. }
  62. }
  63. // InDependencyOrder applies the function to the services of the project taking in account the dependency order
  64. func InDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
  65. graph, err := NewGraph(project, ServiceStopped)
  66. if err != nil {
  67. return err
  68. }
  69. t := upDirectionTraversal(fn)
  70. for _, option := range options {
  71. option(t)
  72. }
  73. return t.visit(ctx, graph)
  74. }
  75. // InReverseDependencyOrder applies the function to the services of the project in reverse order of dependencies
  76. func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
  77. graph, err := NewGraph(project, ServiceStarted)
  78. if err != nil {
  79. return err
  80. }
  81. t := downDirectionTraversal(fn)
  82. for _, option := range options {
  83. option(t)
  84. }
  85. return t.visit(ctx, graph)
  86. }
  87. func WithRootNodesAndDown(nodes []string) func(*graphTraversal) {
  88. return func(t *graphTraversal) {
  89. if len(nodes) == 0 {
  90. return
  91. }
  92. originalFn := t.extremityNodesFn
  93. t.extremityNodesFn = func(graph *Graph) []*Vertex {
  94. var want []string
  95. for _, node := range nodes {
  96. vertex := graph.Vertices[node]
  97. want = append(want, vertex.Service)
  98. for _, v := range getAncestors(vertex) {
  99. want = append(want, v.Service)
  100. }
  101. }
  102. t.ignored = map[string]struct{}{}
  103. for k := range graph.Vertices {
  104. if !utils.Contains(want, k) {
  105. t.ignored[k] = struct{}{}
  106. }
  107. }
  108. return originalFn(graph)
  109. }
  110. }
  111. }
  112. func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
  113. expect := len(g.Vertices)
  114. if expect == 0 {
  115. return nil
  116. }
  117. eg, ctx := errgroup.WithContext(ctx)
  118. if t.maxConcurrency > 0 {
  119. eg.SetLimit(t.maxConcurrency + 1)
  120. }
  121. nodeCh := make(chan *Vertex, expect)
  122. defer close(nodeCh)
  123. // nodeCh need to allow n=expect writers while reader goroutine could have returner after ctx.Done
  124. eg.Go(func() error {
  125. for {
  126. select {
  127. case <-ctx.Done():
  128. return nil
  129. case node := <-nodeCh:
  130. expect--
  131. if expect == 0 {
  132. return nil
  133. }
  134. t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh)
  135. }
  136. }
  137. })
  138. nodes := t.extremityNodesFn(g)
  139. t.run(ctx, g, eg, nodes, nodeCh)
  140. return eg.Wait()
  141. }
  142. // Note: this could be `graph.walk` or whatever
  143. func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, nodeCh chan *Vertex) {
  144. for _, node := range nodes {
  145. // Don't start this service yet if all of its children have
  146. // not been started yet.
  147. if len(t.filterAdjacentByStatusFn(graph, node.Key, t.adjacentServiceStatusToSkip)) != 0 {
  148. continue
  149. }
  150. node := node
  151. if !t.consume(node.Key) {
  152. // another worker already visited this node
  153. continue
  154. }
  155. eg.Go(func() error {
  156. var err error
  157. if _, ignore := t.ignored[node.Service]; !ignore {
  158. err = t.visitorFn(ctx, node.Service)
  159. }
  160. if err == nil {
  161. graph.UpdateStatus(node.Key, t.targetServiceStatus)
  162. }
  163. nodeCh <- node
  164. return err
  165. })
  166. }
  167. }
  168. func (t *graphTraversal) consume(nodeKey string) bool {
  169. t.mu.Lock()
  170. defer t.mu.Unlock()
  171. if t.seen == nil {
  172. t.seen = make(map[string]struct{})
  173. }
  174. if _, ok := t.seen[nodeKey]; ok {
  175. return false
  176. }
  177. t.seen[nodeKey] = struct{}{}
  178. return true
  179. }
  180. // Graph represents project as service dependencies
  181. type Graph struct {
  182. Vertices map[string]*Vertex
  183. lock sync.RWMutex
  184. }
  185. // Vertex represents a service in the dependencies structure
  186. type Vertex struct {
  187. Key string
  188. Service string
  189. Status ServiceStatus
  190. Children map[string]*Vertex
  191. Parents map[string]*Vertex
  192. }
  193. func getParents(v *Vertex) []*Vertex {
  194. return v.GetParents()
  195. }
  196. // GetParents returns a slice with the parent vertices of the a Vertex
  197. func (v *Vertex) GetParents() []*Vertex {
  198. var res []*Vertex
  199. for _, p := range v.Parents {
  200. res = append(res, p)
  201. }
  202. return res
  203. }
  204. func getChildren(v *Vertex) []*Vertex {
  205. return v.GetChildren()
  206. }
  207. // getAncestors return all descendents for a vertex, might contain duplicates
  208. func getAncestors(v *Vertex) []*Vertex {
  209. var descendents []*Vertex
  210. for _, parent := range v.GetParents() {
  211. descendents = append(descendents, parent)
  212. descendents = append(descendents, getAncestors(parent)...)
  213. }
  214. return descendents
  215. }
  216. // GetChildren returns a slice with the child vertices of the a Vertex
  217. func (v *Vertex) GetChildren() []*Vertex {
  218. var res []*Vertex
  219. for _, p := range v.Children {
  220. res = append(res, p)
  221. }
  222. return res
  223. }
  224. // NewGraph returns the dependency graph of the services
  225. func NewGraph(project *types.Project, initialStatus ServiceStatus) (*Graph, error) {
  226. graph := &Graph{
  227. lock: sync.RWMutex{},
  228. Vertices: map[string]*Vertex{},
  229. }
  230. for _, s := range project.Services {
  231. graph.AddVertex(s.Name, s.Name, initialStatus)
  232. }
  233. for index, s := range project.Services {
  234. for _, name := range s.GetDependencies() {
  235. err := graph.AddEdge(s.Name, name)
  236. if err != nil {
  237. if !s.DependsOn[name].Required {
  238. delete(s.DependsOn, name)
  239. project.Services[index] = s
  240. continue
  241. }
  242. if api.IsNotFoundError(err) {
  243. ds, err := project.GetDisabledService(name)
  244. if err == nil {
  245. return nil, fmt.Errorf("service %s is required by %s but is disabled. Can be enabled by profiles %s", name, s.Name, ds.Profiles)
  246. }
  247. }
  248. return nil, err
  249. }
  250. }
  251. }
  252. if b, err := graph.HasCycles(); b {
  253. return nil, err
  254. }
  255. return graph, nil
  256. }
  257. // NewVertex is the constructor function for the Vertex
  258. func NewVertex(key string, service string, initialStatus ServiceStatus) *Vertex {
  259. return &Vertex{
  260. Key: key,
  261. Service: service,
  262. Status: initialStatus,
  263. Parents: map[string]*Vertex{},
  264. Children: map[string]*Vertex{},
  265. }
  266. }
  267. // AddVertex adds a vertex to the Graph
  268. func (g *Graph) AddVertex(key string, service string, initialStatus ServiceStatus) {
  269. g.lock.Lock()
  270. defer g.lock.Unlock()
  271. v := NewVertex(key, service, initialStatus)
  272. g.Vertices[key] = v
  273. }
  274. // AddEdge adds a relationship of dependency between vertices `source` and `destination`
  275. func (g *Graph) AddEdge(source string, destination string) error {
  276. g.lock.Lock()
  277. defer g.lock.Unlock()
  278. sourceVertex := g.Vertices[source]
  279. destinationVertex := g.Vertices[destination]
  280. if sourceVertex == nil {
  281. return fmt.Errorf("could not find %s: %w", source, api.ErrNotFound)
  282. }
  283. if destinationVertex == nil {
  284. return fmt.Errorf("could not find %s: %w", destination, api.ErrNotFound)
  285. }
  286. // If they are already connected
  287. if _, ok := sourceVertex.Children[destination]; ok {
  288. return nil
  289. }
  290. sourceVertex.Children[destination] = destinationVertex
  291. destinationVertex.Parents[source] = sourceVertex
  292. return nil
  293. }
  294. func leaves(g *Graph) []*Vertex {
  295. return g.Leaves()
  296. }
  297. // Leaves returns the slice of leaves of the graph
  298. func (g *Graph) Leaves() []*Vertex {
  299. g.lock.Lock()
  300. defer g.lock.Unlock()
  301. var res []*Vertex
  302. for _, v := range g.Vertices {
  303. if len(v.Children) == 0 {
  304. res = append(res, v)
  305. }
  306. }
  307. return res
  308. }
  309. func roots(g *Graph) []*Vertex {
  310. return g.Roots()
  311. }
  312. // Roots returns the slice of "Roots" of the graph
  313. func (g *Graph) Roots() []*Vertex {
  314. g.lock.Lock()
  315. defer g.lock.Unlock()
  316. var res []*Vertex
  317. for _, v := range g.Vertices {
  318. if len(v.Parents) == 0 {
  319. res = append(res, v)
  320. }
  321. }
  322. return res
  323. }
  324. // UpdateStatus updates the status of a certain vertex
  325. func (g *Graph) UpdateStatus(key string, status ServiceStatus) {
  326. g.lock.Lock()
  327. defer g.lock.Unlock()
  328. g.Vertices[key].Status = status
  329. }
  330. func filterChildren(g *Graph, k string, s ServiceStatus) []*Vertex {
  331. return g.FilterChildren(k, s)
  332. }
  333. // FilterChildren returns children of a certain vertex that are in a certain status
  334. func (g *Graph) FilterChildren(key string, status ServiceStatus) []*Vertex {
  335. g.lock.Lock()
  336. defer g.lock.Unlock()
  337. var res []*Vertex
  338. vertex := g.Vertices[key]
  339. for _, child := range vertex.Children {
  340. if child.Status == status {
  341. res = append(res, child)
  342. }
  343. }
  344. return res
  345. }
  346. func filterParents(g *Graph, k string, s ServiceStatus) []*Vertex {
  347. return g.FilterParents(k, s)
  348. }
  349. // FilterParents returns the parents of a certain vertex that are in a certain status
  350. func (g *Graph) FilterParents(key string, status ServiceStatus) []*Vertex {
  351. g.lock.Lock()
  352. defer g.lock.Unlock()
  353. var res []*Vertex
  354. vertex := g.Vertices[key]
  355. for _, parent := range vertex.Parents {
  356. if parent.Status == status {
  357. res = append(res, parent)
  358. }
  359. }
  360. return res
  361. }
  362. // HasCycles detects cycles in the graph
  363. func (g *Graph) HasCycles() (bool, error) {
  364. discovered := []string{}
  365. finished := []string{}
  366. for _, vertex := range g.Vertices {
  367. path := []string{
  368. vertex.Key,
  369. }
  370. if !utils.StringContains(discovered, vertex.Key) && !utils.StringContains(finished, vertex.Key) {
  371. var err error
  372. discovered, finished, err = g.visit(vertex.Key, path, discovered, finished)
  373. if err != nil {
  374. return true, err
  375. }
  376. }
  377. }
  378. return false, nil
  379. }
  380. func (g *Graph) visit(key string, path []string, discovered []string, finished []string) ([]string, []string, error) {
  381. discovered = append(discovered, key)
  382. for _, v := range g.Vertices[key].Children {
  383. path := append(path, v.Key)
  384. if utils.StringContains(discovered, v.Key) {
  385. return nil, nil, fmt.Errorf("cycle found: %s", strings.Join(path, " -> "))
  386. }
  387. if !utils.StringContains(finished, v.Key) {
  388. if _, _, err := g.visit(v.Key, path, discovered, finished); err != nil {
  389. return nil, nil, err
  390. }
  391. }
  392. }
  393. discovered = remove(discovered, key)
  394. finished = append(finished, key)
  395. return discovered, finished, nil
  396. }
  397. func remove(slice []string, item string) []string {
  398. var s []string
  399. for _, i := range slice {
  400. if i != item {
  401. s = append(s, i)
  402. }
  403. }
  404. return s
  405. }