瀏覽代碼

Merge pull request #10544 from ndeloof/parallel_race

fix race condition when --parallel is used with a large number of dependent services
Guillaume Lours 2 年之前
父節點
當前提交
3b32a264c7
共有 1 個文件被更改,包括 27 次插入12 次删除
  1. 27 12
      pkg/compose/dependencies.go

+ 27 - 12
pkg/compose/dependencies.go

@@ -97,19 +97,37 @@ func InReverseDependencyOrder(ctx context.Context, project *types.Project, fn fu
 }
 
 func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
-	nodes := t.extremityNodesFn(g)
+	expect := len(g.Vertices)
+	if expect == 0 {
+		return nil
+	}
 
 	eg, ctx := errgroup.WithContext(ctx)
 	if t.maxConcurrency > 0 {
-		eg.SetLimit(t.maxConcurrency)
+		eg.SetLimit(t.maxConcurrency + 1)
 	}
-	t.run(ctx, g, eg, nodes)
+	nodeCh := make(chan *Vertex)
+	eg.Go(func() error {
+		for node := range nodeCh {
+			expect--
+			if expect == 0 {
+				close(nodeCh)
+				return nil
+			}
+			t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh)
+		}
+		return nil
+	})
 
-	return eg.Wait()
+	nodes := t.extremityNodesFn(g)
+	t.run(ctx, g, eg, nodes, nodeCh)
+
+	err := eg.Wait()
+	return err
 }
 
 // Note: this could be `graph.walk` or whatever
-func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex) {
+func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Group, nodes []*Vertex, nodeCh chan *Vertex) {
 	for _, node := range nodes {
 		// Don't start this service yet if all of its children have
 		// not been started yet.
@@ -125,14 +143,11 @@ func (t *graphTraversal) run(ctx context.Context, graph *Graph, eg *errgroup.Gro
 
 		eg.Go(func() error {
 			err := t.visitorFn(ctx, node.Service)
-			if err != nil {
-				return err
+			if err == nil {
+				graph.UpdateStatus(node.Key, t.targetServiceStatus)
 			}
-
-			graph.UpdateStatus(node.Key, t.targetServiceStatus)
-
-			t.run(ctx, graph, eg, t.adjacentNodesFn(node))
-			return nil
+			nodeCh <- node
+			return err
 		})
 	}
 }