|
|
@@ -140,24 +140,28 @@ func (t *graphTraversal) visit(ctx context.Context, g *Graph) error {
|
|
|
if t.maxConcurrency > 0 {
|
|
|
eg.SetLimit(t.maxConcurrency + 1)
|
|
|
}
|
|
|
- nodeCh := make(chan *Vertex)
|
|
|
+ nodeCh := make(chan *Vertex, expect)
|
|
|
+ defer close(nodeCh)
|
|
|
+ // nodeCh need to allow n=expect writers while reader goroutine could have returner after ctx.Done
|
|
|
eg.Go(func() error {
|
|
|
- for node := range nodeCh {
|
|
|
- expect--
|
|
|
- if expect == 0 {
|
|
|
- close(nodeCh)
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case <-ctx.Done():
|
|
|
return nil
|
|
|
+ case node := <-nodeCh:
|
|
|
+ expect--
|
|
|
+ if expect == 0 {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh)
|
|
|
}
|
|
|
- t.run(ctx, g, eg, t.adjacentNodesFn(node), nodeCh)
|
|
|
}
|
|
|
- return nil
|
|
|
})
|
|
|
|
|
|
nodes := t.extremityNodesFn(g)
|
|
|
t.run(ctx, g, eg, nodes, nodeCh)
|
|
|
|
|
|
- err := eg.Wait()
|
|
|
- return err
|
|
|
+ return eg.Wait()
|
|
|
}
|
|
|
|
|
|
// Note: this could be `graph.walk` or whatever
|