소스 검색

Use async iterators in Expand.

Bart De Smet 6 년 전
부모
커밋
038c9696f7
1개의 변경된 파일65개의 추가작업 그리고 0개의 파일을 삭제
  1. 65 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs

+ 65 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs

@@ -18,7 +18,28 @@ namespace System.Linq
             if (selector == null)
                 throw Error.ArgumentNull(nameof(selector));
 
+#if USE_ASYNC_ITERATOR
+            return AsyncEnumerable.Create(Core);
+
+            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+            {
+                var queue = new Queue<IAsyncEnumerable<TSource>>();
+
+                queue.Enqueue(source);
+
+                while (queue.Count > 0)
+                {
+                    await foreach (TSource item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
+                    {
+                        queue.Enqueue(selector(item));
+
+                        yield return item;
+                    }
+                }
+            }
+#else
             return new ExpandAsyncIterator<TSource>(source, selector);
+#endif
         }
 
         public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TSource>>> selector)
@@ -28,7 +49,28 @@ namespace System.Linq
             if (selector == null)
                 throw Error.ArgumentNull(nameof(selector));
 
+#if USE_ASYNC_ITERATOR
+            return AsyncEnumerable.Create(Core);
+
+            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+            {
+                var queue = new Queue<IAsyncEnumerable<TSource>>();
+
+                queue.Enqueue(source);
+
+                while (queue.Count > 0)
+                {
+                    await foreach (TSource item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
+                    {
+                        queue.Enqueue(await selector(item).ConfigureAwait(false));
+
+                        yield return item;
+                    }
+                }
+            }
+#else
             return new ExpandAsyncIteratorWithTask<TSource>(source, selector);
+#endif
         }
 
 #if !NO_DEEP_CANCELLATION
@@ -39,10 +81,32 @@ namespace System.Linq
             if (selector == null)
                 throw Error.ArgumentNull(nameof(selector));
 
+#if USE_ASYNC_ITERATOR
+            return AsyncEnumerable.Create(Core);
+
+            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+            {
+                var queue = new Queue<IAsyncEnumerable<TSource>>();
+
+                queue.Enqueue(source);
+
+                while (queue.Count > 0)
+                {
+                    await foreach (TSource item in queue.Dequeue().WithCancellation(cancellationToken).ConfigureAwait(false))
+                    {
+                        queue.Enqueue(await selector(item, cancellationToken).ConfigureAwait(false));
+
+                        yield return item;
+                    }
+                }
+            }
+#else
             return new ExpandAsyncIteratorWithTaskAndCancellation<TSource>(source, selector);
+#endif
         }
 #endif
 
+#if !USE_ASYNC_ITERATOR
         private sealed class ExpandAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly Func<TSource, IAsyncEnumerable<TSource>> _selector;
@@ -313,6 +377,7 @@ namespace System.Linq
                 return false;
             }
         }
+#endif
 #endif
     }
 }