dependencies.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476
  1. /*
  2. Copyright 2020 Docker Compose CLI authors
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package compose
  14. import (
  15. "context"
  16. "fmt"
  17. "strings"
  18. "sync"
  19. "github.com/compose-spec/compose-go/types"
  20. "github.com/docker/compose/v2/pkg/api"
  21. "github.com/pkg/errors"
  22. "golang.org/x/sync/errgroup"
  23. "github.com/docker/compose/v2/pkg/utils"
  24. )
  25. // ServiceStatus indicates the status of a service
  26. type ServiceStatus int
  27. // Services status flags
  28. const (
  29. ServiceStopped ServiceStatus = iota
  30. ServiceStarted
  31. )
  32. type graphTraversal struct {
  33. mu sync.Mutex
  34. seen map[string]struct{}
  35. ignored map[string]struct{}
  36. extremityNodesFn func(*Graph) []*Vertex // leaves or roots
  37. adjacentNodesFn func(*Vertex) []*Vertex // getParents or getChildren
  38. filterAdjacentByStatusFn func(*Graph, string, ServiceStatus) []*Vertex // filterChildren or filterParents
  39. targetServiceStatus ServiceStatus
  40. adjacentServiceStatusToSkip ServiceStatus
  41. visitorFn func(context.Context, string) error
  42. maxConcurrency int
  43. }
  44. func upDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  45. return &graphTraversal{
  46. extremityNodesFn: leaves,
  47. adjacentNodesFn: getParents,
  48. filterAdjacentByStatusFn: filterChildren,
  49. adjacentServiceStatusToSkip: ServiceStopped,
  50. targetServiceStatus: ServiceStarted,
  51. visitorFn: visitorFn,
  52. }
  53. }
  54. func downDirectionTraversal(visitorFn func(context.Context, string) error) *graphTraversal {
  55. return &graphTraversal{
  56. extremityNodesFn: roots,
  57. adjacentNodesFn: getChildren,
  58. filterAdjacentByStatusFn: filterParents,
  59. adjacentServiceStatusToSkip: ServiceStarted,
  60. targetServiceStatus: ServiceStopped,
  61. visitorFn: visitorFn,
  62. }
  63. }
  64. // InDependencyOrder applies the function to the services of the project taking in account the dependency order
  65. func InDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
  66. graph, err := NewGraph(project, ServiceStopped)
  67. if err != nil {
  68. return err
  69. }
  70. t := upDirectionTraversal(fn)
  71. for _, option := range options {
  72. option(t)
  73. }
  74. return t.visit(ctx, graph)
  75. }
  76. // InReverseDependencyOrder applies the function to the services of the project in reverse order of dependencies
  77. func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn func(context.Context, string) error, options ...func(*graphTraversal)) error {
  78. graph, err := NewGraph(project, ServiceStarted)
  79. if err != nil {
  80. return err
  81. }
  82. t := downDirectionTraversal(fn)
  83. for _, option := range options {
  84. option(t)
  85. }
  86. return t.visit(ctx, graph)
  87. }
  88. func WithRootNodesAndDown(nodes []string) func(*graphTraversal) {
  89. return func(t *graphTraversal) {
  90. if len(nodes) == 0 {
  91. return
  92. }
  93. originalFn := t.extremityNodesFn
  94. t.extremityNodesFn = func(graph *Graph) []*Vertex {
  95. var want []string
  96. for _, node := range nodes {
  97. vertex := graph.Vertices[node]
  98. want = append(want, vertex.Service)
  99. for _, v := range getAncestors(vertex) {
  100. want = append(want, v.Service)
  101. }
  102. }
  103. t.ignored = map[string]struct{}{}
  104. for k := range graph.Vertices {
  105. if !utils.Contains(want, k) {
  106. t.ignored[k] = struct{}{}
  107. }
  108. }
  109. return originalFn(graph)
  110. }
  111. }
  112. }
  113. func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
  114. expect := len(g.Vertices)
  115. if expect == 0 {
  116. return nil
  117. }
  118. eg, ctx := errgroup.WithContext(ctx)
  119. if t.maxConcurrency > 0 {
  120. eg.SetLimit(t.maxConcurrency + 1)
  121. }
  122. nodeCh := make(chan *Vertex, expect)
  123. defer close(nodeCh)
  124. // nodeCh need to allow n=expect writers while reader goroutine could have returner after ctx.Done
  125. eg.Go(func() error {
  126. for {
  127. select {
  128. case <-ctx.Done():
  129. return nil
  130. case node := <-nodeCh:
  131. expect--
  132. if expect == 0 {
  133. return nil
  134. }
  135. t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh)
  136. }
  137. }
  138. })
  139. nodes := t.extremityNodesFn(g)
  140. t.run(ctx, g, eg, nodes, nodeCh)
  141. return eg.Wait()
  142. }
  143. // Note: this could be `graph.walk` or whatever
  144. func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, nodeCh chan *Vertex) {
  145. for _, node := range nodes {
  146. // Don't start this service yet if all of its children have
  147. // not been started yet.
  148. if len(t.filterAdjacentByStatusFn(graph, node.Key, t.adjacentServiceStatusToSkip)) != 0 {
  149. continue
  150. }
  151. node := node
  152. if !t.consume(node.Key) {
  153. // another worker already visited this node
  154. continue
  155. }
  156. eg.Go(func() error {
  157. var err error
  158. if _, ignore := t.ignored[node.Service]; !ignore {
  159. err = t.visitorFn(ctx, node.Service)
  160. }
  161. if err == nil {
  162. graph.UpdateStatus(node.Key, t.targetServiceStatus)
  163. }
  164. nodeCh <- node
  165. return err
  166. })
  167. }
  168. }
  169. func (t *graphTraversal) consume(nodeKey string) bool {
  170. t.mu.Lock()
  171. defer t.mu.Unlock()
  172. if t.seen == nil {
  173. t.seen = make(map[string]struct{})
  174. }
  175. if _, ok := t.seen[nodeKey]; ok {
  176. return false
  177. }
  178. t.seen[nodeKey] = struct{}{}
  179. return true
  180. }
  181. // Graph represents project as service dependencies
  182. type Graph struct {
  183. Vertices map[string]*Vertex
  184. lock sync.RWMutex
  185. }
  186. // Vertex represents a service in the dependencies structure
  187. type Vertex struct {
  188. Key string
  189. Service string
  190. Status ServiceStatus
  191. Children map[string]*Vertex
  192. Parents map[string]*Vertex
  193. }
  194. func getParents(v *Vertex) []*Vertex {
  195. return v.GetParents()
  196. }
  197. // GetParents returns a slice with the parent vertices of the a Vertex
  198. func (v *Vertex) GetParents() []*Vertex {
  199. var res []*Vertex
  200. for _, p := range v.Parents {
  201. res = append(res, p)
  202. }
  203. return res
  204. }
  205. func getChildren(v *Vertex) []*Vertex {
  206. return v.GetChildren()
  207. }
  208. // getAncestors return all descendents for a vertex, might contain duplicates
  209. func getAncestors(v *Vertex) []*Vertex {
  210. var descendents []*Vertex
  211. for _, parent := range v.GetParents() {
  212. descendents = append(descendents, parent)
  213. descendents = append(descendents, getAncestors(parent)...)
  214. }
  215. return descendents
  216. }
  217. // GetChildren returns a slice with the child vertices of the a Vertex
  218. func (v *Vertex) GetChildren() []*Vertex {
  219. var res []*Vertex
  220. for _, p := range v.Children {
  221. res = append(res, p)
  222. }
  223. return res
  224. }
  225. // NewGraph returns the dependency graph of the services
  226. func NewGraph(project *types.Project, initialStatus ServiceStatus) (*Graph, error) {
  227. graph := &Graph{
  228. lock: sync.RWMutex{},
  229. Vertices: map[string]*Vertex{},
  230. }
  231. for _, s := range project.Services {
  232. graph.AddVertex(s.Name, s.Name, initialStatus)
  233. }
  234. for _, s := range project.Services {
  235. for _, name := range s.GetDependencies() {
  236. err := graph.AddEdge(s.Name, name)
  237. if err != nil {
  238. if api.IsNotFoundError(err) {
  239. ds, err := project.GetDisabledService(name)
  240. if err == nil {
  241. return nil, fmt.Errorf("service %s is required by %s but is disabled. Can be enabled by profiles %s", name, s.Name, ds.Profiles)
  242. }
  243. }
  244. return nil, err
  245. }
  246. }
  247. }
  248. if b, err := graph.HasCycles(); b {
  249. return nil, err
  250. }
  251. return graph, nil
  252. }
  253. // NewVertex is the constructor function for the Vertex
  254. func NewVertex(key string, service string, initialStatus ServiceStatus) *Vertex {
  255. return &Vertex{
  256. Key: key,
  257. Service: service,
  258. Status: initialStatus,
  259. Parents: map[string]*Vertex{},
  260. Children: map[string]*Vertex{},
  261. }
  262. }
  263. // AddVertex adds a vertex to the Graph
  264. func (g *Graph) AddVertex(key string, service string, initialStatus ServiceStatus) {
  265. g.lock.Lock()
  266. defer g.lock.Unlock()
  267. v := NewVertex(key, service, initialStatus)
  268. g.Vertices[key] = v
  269. }
  270. // AddEdge adds a relationship of dependency between vertices `source` and `destination`
  271. func (g *Graph) AddEdge(source string, destination string) error {
  272. g.lock.Lock()
  273. defer g.lock.Unlock()
  274. sourceVertex := g.Vertices[source]
  275. destinationVertex := g.Vertices[destination]
  276. if sourceVertex == nil {
  277. return errors.Wrapf(api.ErrNotFound, "could not find %s", source)
  278. }
  279. if destinationVertex == nil {
  280. return errors.Wrapf(api.ErrNotFound, "could not find %s", destination)
  281. }
  282. // If they are already connected
  283. if _, ok := sourceVertex.Children[destination]; ok {
  284. return nil
  285. }
  286. sourceVertex.Children[destination] = destinationVertex
  287. destinationVertex.Parents[source] = sourceVertex
  288. return nil
  289. }
  290. func leaves(g *Graph) []*Vertex {
  291. return g.Leaves()
  292. }
  293. // Leaves returns the slice of leaves of the graph
  294. func (g *Graph) Leaves() []*Vertex {
  295. g.lock.Lock()
  296. defer g.lock.Unlock()
  297. var res []*Vertex
  298. for _, v := range g.Vertices {
  299. if len(v.Children) == 0 {
  300. res = append(res, v)
  301. }
  302. }
  303. return res
  304. }
  305. func roots(g *Graph) []*Vertex {
  306. return g.Roots()
  307. }
  308. // Roots returns the slice of "Roots" of the graph
  309. func (g *Graph) Roots() []*Vertex {
  310. g.lock.Lock()
  311. defer g.lock.Unlock()
  312. var res []*Vertex
  313. for _, v := range g.Vertices {
  314. if len(v.Parents) == 0 {
  315. res = append(res, v)
  316. }
  317. }
  318. return res
  319. }
  320. // UpdateStatus updates the status of a certain vertex
  321. func (g *Graph) UpdateStatus(key string, status ServiceStatus) {
  322. g.lock.Lock()
  323. defer g.lock.Unlock()
  324. g.Vertices[key].Status = status
  325. }
  326. func filterChildren(g *Graph, k string, s ServiceStatus) []*Vertex {
  327. return g.FilterChildren(k, s)
  328. }
  329. // FilterChildren returns children of a certain vertex that are in a certain status
  330. func (g *Graph) FilterChildren(key string, status ServiceStatus) []*Vertex {
  331. g.lock.Lock()
  332. defer g.lock.Unlock()
  333. var res []*Vertex
  334. vertex := g.Vertices[key]
  335. for _, child := range vertex.Children {
  336. if child.Status == status {
  337. res = append(res, child)
  338. }
  339. }
  340. return res
  341. }
  342. func filterParents(g *Graph, k string, s ServiceStatus) []*Vertex {
  343. return g.FilterParents(k, s)
  344. }
  345. // FilterParents returns the parents of a certain vertex that are in a certain status
  346. func (g *Graph) FilterParents(key string, status ServiceStatus) []*Vertex {
  347. g.lock.Lock()
  348. defer g.lock.Unlock()
  349. var res []*Vertex
  350. vertex := g.Vertices[key]
  351. for _, parent := range vertex.Parents {
  352. if parent.Status == status {
  353. res = append(res, parent)
  354. }
  355. }
  356. return res
  357. }
  358. // HasCycles detects cycles in the graph
  359. func (g *Graph) HasCycles() (bool, error) {
  360. discovered := []string{}
  361. finished := []string{}
  362. for _, vertex := range g.Vertices {
  363. path := []string{
  364. vertex.Key,
  365. }
  366. if !utils.StringContains(discovered, vertex.Key) && !utils.StringContains(finished, vertex.Key) {
  367. var err error
  368. discovered, finished, err = g.visit(vertex.Key, path, discovered, finished)
  369. if err != nil {
  370. return true, err
  371. }
  372. }
  373. }
  374. return false, nil
  375. }
  376. func (g *Graph) visit(key string, path []string, discovered []string, finished []string) ([]string, []string, error) {
  377. discovered = append(discovered, key)
  378. for _, v := range g.Vertices[key].Children {
  379. path := append(path, v.Key)
  380. if utils.StringContains(discovered, v.Key) {
  381. return nil, nil, fmt.Errorf("cycle found: %s", strings.Join(path, " -> "))
  382. }
  383. if !utils.StringContains(finished, v.Key) {
  384. if _, _, err := g.visit(v.Key, path, discovered, finished); err != nil {
  385. return nil, nil, err
  386. }
  387. }
  388. }
  389. discovered = remove(discovered, key)
  390. finished = append(finished, key)
  391. return discovered, finished, nil
  392. }
  393. func remove(slice []string, item string) []string {
  394. var s []string
  395. for _, i := range slice {
  396. if i != item {
  397. s = append(s, i)
  398. }
  399. }
  400. return s
  401. }