Browse Source

Iterator-based implementation of GroupJoin and Join.

Bart De Smet 6 years ago
parent
commit
1449b9663d

+ 71 - 0
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/GroupJoin.cs

@@ -26,7 +26,30 @@ namespace System.Linq
             if (resultSelector == null)
                 throw Error.ArgumentNull(nameof(resultSelector));
 
+#if CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR // https://github.com/dotnet/roslyn/pull/31114
+            return Create(Core);
+
+            async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = outer.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    if (await e.MoveNextAsync())
+                    {
+                        var lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
+
+                        do
+                        {
+                            var item = e.Current;
+                            var outerKey = outerKeySelector(item);
+                            yield return resultSelector(item, lookup[outerKey]);
+                        }
+                        while (await e.MoveNextAsync());
+                    }
+                }
+            }
+#else
             return new GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
+#endif
         }
 
         public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, ValueTask<TResult>> resultSelector) =>
@@ -45,7 +68,30 @@ namespace System.Linq
             if (resultSelector == null)
                 throw Error.ArgumentNull(nameof(resultSelector));
 
+#if CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR // https://github.com/dotnet/roslyn/pull/31114
+            return Create(Core);
+
+            async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = outer.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    if (await e.MoveNextAsync())
+                    {
+                        var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
+
+                        do
+                        {
+                            var item = e.Current;
+                            var outerKey = await outerKeySelector(item).ConfigureAwait(false);
+                            yield return await resultSelector(item, lookup[outerKey]).ConfigureAwait(false);
+                        }
+                        while (await e.MoveNextAsync());
+                    }
+                }
+            }
+#else
             return new GroupJoinAsyncEnumerableWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
+#endif
         }
 
 #if !NO_DEEP_CANCELLATION
@@ -65,10 +111,34 @@ namespace System.Linq
             if (resultSelector == null)
                 throw Error.ArgumentNull(nameof(resultSelector));
 
+#if CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR // https://github.com/dotnet/roslyn/pull/31114
+            return Create(Core);
+
+            async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = outer.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    if (await e.MoveNextAsync())
+                    {
+                        var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
+
+                        do
+                        {
+                            var item = e.Current;
+                            var outerKey = await outerKeySelector(item, cancellationToken).ConfigureAwait(false);
+                            yield return await resultSelector(item, lookup[outerKey], cancellationToken).ConfigureAwait(false);
+                        }
+                        while (await e.MoveNextAsync());
+                    }
+                }
+            }
+#else
             return new GroupJoinAsyncEnumerableWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
+#endif
         }
 #endif
 
+#if !(CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR)
         private sealed class GroupJoinAsyncEnumerable<TOuter, TInner, TKey, TResult> : IAsyncEnumerable<TResult>
         {
             private readonly IEqualityComparer<TKey> _comparer;
@@ -363,6 +433,7 @@ namespace System.Linq
                 public ValueTask DisposeAsync() => _outer.DisposeAsync();
             }
         }
+#endif
 #endif
     }
 }

+ 119 - 0
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Join.cs

@@ -27,7 +27,46 @@ namespace System.Linq
             if (resultSelector == null)
                 throw Error.ArgumentNull(nameof(resultSelector));
 
+#if CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR // https://github.com/dotnet/roslyn/pull/31114
+            return Create(Core);
+
+            async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = outer.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    if (await e.MoveNextAsync())
+                    {
+                        var lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
+
+                        if (lookup.Count != 0)
+                        {
+                            do
+                            {
+                                var item = e.Current;
+
+                                var outerKey = outerKeySelector(item);
+
+                                var g = lookup.GetGrouping(outerKey, create: false);
+
+                                if (g != null)
+                                {
+                                    var count = g._count;
+                                    var elements = g._elements;
+
+                                    for (var i = 0; i != count; ++i)
+                                    {
+                                        yield return resultSelector(item, elements[i]);
+                                    }
+                                }
+                            }
+                            while (await e.MoveNextAsync());
+                        }
+                    }
+                }
+            }
+#else
             return new JoinAsyncIterator<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
+#endif
         }
 
         public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, ValueTask<TKey>> outerKeySelector, Func<TInner, ValueTask<TKey>> innerKeySelector, Func<TOuter, TInner, ValueTask<TResult>> resultSelector) =>
@@ -46,7 +85,46 @@ namespace System.Linq
             if (resultSelector == null)
                 throw Error.ArgumentNull(nameof(resultSelector));
 
+#if CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR // https://github.com/dotnet/roslyn/pull/31114
+            return Create(Core);
+
+            async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = outer.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    if (await e.MoveNextAsync())
+                    {
+                        var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
+
+                        if (lookup.Count != 0)
+                        {
+                            do
+                            {
+                                var item = e.Current;
+
+                                var outerKey = await outerKeySelector(item).ConfigureAwait(false);
+
+                                var g = lookup.GetGrouping(outerKey, create: false);
+
+                                if (g != null)
+                                {
+                                    var count = g._count;
+                                    var elements = g._elements;
+
+                                    for (var i = 0; i != count; ++i)
+                                    {
+                                        yield return await resultSelector(item, elements[i]).ConfigureAwait(false);
+                                    }
+                                }
+                            }
+                            while (await e.MoveNextAsync());
+                        }
+                    }
+                }
+            }
+#else
             return new JoinAsyncIteratorWithTask<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
+#endif
         }
 
 #if !NO_DEEP_CANCELLATION
@@ -66,10 +144,50 @@ namespace System.Linq
             if (resultSelector == null)
                 throw Error.ArgumentNull(nameof(resultSelector));
 
+#if CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR // https://github.com/dotnet/roslyn/pull/31114
+            return Create(Core);
+
+            async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = outer.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    if (await e.MoveNextAsync())
+                    {
+                        var lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
+
+                        if (lookup.Count != 0)
+                        {
+                            do
+                            {
+                                var item = e.Current;
+
+                                var outerKey = await outerKeySelector(item, cancellationToken).ConfigureAwait(false);
+
+                                var g = lookup.GetGrouping(outerKey, create: false);
+
+                                if (g != null)
+                                {
+                                    var count = g._count;
+                                    var elements = g._elements;
+
+                                    for (var i = 0; i != count; ++i)
+                                    {
+                                        yield return await resultSelector(item, elements[i], cancellationToken).ConfigureAwait(false);
+                                    }
+                                }
+                            }
+                            while (await e.MoveNextAsync());
+                        }
+                    }
+                }
+            }
+#else
             return new JoinAsyncIteratorWithTaskAndCancellation<TOuter, TInner, TKey, TResult>(outer, inner, outerKeySelector, innerKeySelector, resultSelector, comparer);
+#endif
         }
 #endif
 
+#if !(CSHARP8 && USE_ASYNC_ITERATOR && ASYNC_ITERATOR_CAN_RETURN_AETOR)
         private sealed class JoinAsyncIterator<TOuter, TInner, TKey, TResult> : AsyncIterator<TResult>
         {
             private readonly IAsyncEnumerable<TOuter> _outer;
@@ -451,6 +569,7 @@ namespace System.Linq
                 return false;
             }
         }
+#endif
 #endif
     }
 }