Browse Source

Tame concurrency in Except and Intersect.

Bart De Smet 7 years ago
parent
commit
d10837ce07

+ 21 - 0
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerableHelpers.cs

@@ -106,6 +106,27 @@ namespace System.Collections.Generic
             return result;
         }
 
+        internal static async Task<Set<T>> ToSet<T>(IAsyncEnumerable<T> source, IEqualityComparer<T> comparer, CancellationToken cancellationToken)
+        {
+            var e = source.GetAsyncEnumerator(cancellationToken);
+
+            try
+            {
+                var set = new Set<T>(comparer);
+
+                while (await e.MoveNextAsync().ConfigureAwait(false))
+                {
+                    set.Add(e.Current);
+                }
+
+                return set;
+            }
+            finally
+            {
+                await e.DisposeAsync().ConfigureAwait(false);
+            }
+        }
+
         internal struct ArrayWithLength<T>
         {
             public T[] Array;

+ 7 - 30
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Except.cs

@@ -39,13 +39,9 @@ namespace System.Linq
             private readonly IAsyncEnumerable<TSource> _first;
             private readonly IAsyncEnumerable<TSource> _second;
 
-            private Task _fillSetTask;
-
             private IAsyncEnumerator<TSource> _firstEnumerator;
             private Set<TSource> _set;
 
-            private bool _setFilled;
-
             public ExceptAsyncIterator(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
             {
                 Debug.Assert(first != null);
@@ -77,13 +73,16 @@ namespace System.Linq
 
             protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
+                // NB: Earlier implementations of this operator constructed the set for the second source concurrently
+                //     with the first MoveNextAsync call on the first source. This resulted in an unexpected source of
+                //     concurrency, which isn't a great default behavior because it's very hard to suppress or control
+                //     this behavior.
+
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
+                        _set = await AsyncEnumerableHelpers.ToSet(_second, _comparer, cancellationToken).ConfigureAwait(false);
                         _firstEnumerator = _first.GetAsyncEnumerator(cancellationToken);
-                        _set = new Set<TSource>(_comparer);
-                        _setFilled = false;
-                        _fillSetTask = FillSetAsync(cancellationToken);
 
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -92,19 +91,7 @@ namespace System.Linq
                         bool moveNext;
                         do
                         {
-                            if (!_setFilled)
-                            {
-                                // This is here so we don't need to call Task.WhenAll each time after the set is filled
-                                var moveNextTask = _firstEnumerator.MoveNextAsync();
-                                await Task.WhenAll(moveNextTask.AsTask(), _fillSetTask).ConfigureAwait(false);
-
-                                _setFilled = true;
-                                moveNext = await moveNextTask.ConfigureAwait(false);
-                            }
-                            else
-                            {
-                                moveNext = await _firstEnumerator.MoveNextAsync().ConfigureAwait(false);
-                            }
+                            moveNext = await _firstEnumerator.MoveNextAsync().ConfigureAwait(false);
 
                             if (moveNext)
                             {
@@ -123,16 +110,6 @@ namespace System.Linq
 
                 return false;
             }
-
-            private async Task FillSetAsync(CancellationToken cancellationToken)
-            {
-                var array = await _second.ToArray(cancellationToken).ConfigureAwait(false);
-
-                foreach (var t in array)
-                {
-                    _set.Add(t);
-                }
-            }
         }
     }
 }

+ 7 - 28
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Intersect.cs

@@ -39,13 +39,9 @@ namespace System.Linq
             private readonly IAsyncEnumerable<TSource> _first;
             private readonly IAsyncEnumerable<TSource> _second;
 
-            private Task _fillSetTask;
-
             private IAsyncEnumerator<TSource> _firstEnumerator;
             private Set<TSource> _set;
 
-            private bool _setFilled;
-
             public IntersectAsyncIterator(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
             {
                 Debug.Assert(first != null);
@@ -77,13 +73,16 @@ namespace System.Linq
 
             protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
+                // NB: Earlier implementations of this operator constructed the set for the second source concurrently
+                //     with the first MoveNextAsync call on the first source. This resulted in an unexpected source of
+                //     concurrency, which isn't a great default behavior because it's very hard to suppress or control
+                //     this behavior.
+
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
+                        _set = await AsyncEnumerableHelpers.ToSet(_second, _comparer, cancellationToken).ConfigureAwait(false);
                         _firstEnumerator = _first.GetAsyncEnumerator(cancellationToken);
-                        _set = new Set<TSource>(_comparer);
-                        _setFilled = false;
-                        _fillSetTask = FillSet(cancellationToken);
 
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -93,18 +92,7 @@ namespace System.Linq
                         bool moveNext;
                         do
                         {
-                            if (!_setFilled)
-                            {
-                                // This is here so we don't need to call Task.WhenAll each time after the set is filled
-                                var moveNextTask = _firstEnumerator.MoveNextAsync();
-                                await Task.WhenAll(moveNextTask.AsTask(), _fillSetTask).ConfigureAwait(false);
-                                _setFilled = true;
-                                moveNext = await moveNextTask.ConfigureAwait(false);
-                            }
-                            else
-                            {
-                                moveNext = await _firstEnumerator.MoveNextAsync().ConfigureAwait(false);
-                            }
+                            moveNext = await _firstEnumerator.MoveNextAsync().ConfigureAwait(false);
 
                             if (moveNext)
                             {
@@ -123,15 +111,6 @@ namespace System.Linq
 
                 return false;
             }
-
-            private async Task FillSet(CancellationToken cancellationToken)
-            {
-                var array = await _second.ToArray(cancellationToken).ConfigureAwait(false);
-                for (var i = 0; i < array.Length; i++)
-                {
-                    _set.Add(array[i]);
-                }
-            }
         }
     }
 }