Ver código fonte

Use async iterators in Do.

Bart De Smet 6 anos atrás
pai
commit
3abd5af342

+ 128 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Do.cs

@@ -171,21 +171,148 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
         {
+#if USE_ASYNC_ITERATOR
+            return AsyncEnumerable.Create(Core);
+
+            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = source.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    while (true)
+                    {
+                        TSource item;
+
+                        try
+                        {
+                            if (!await e.MoveNextAsync())
+                            {
+                                break;
+                            }
+
+                            item = e.Current;
+
+                            onNext(item);
+                        }
+                        catch (OperationCanceledException)
+                        {
+                            throw;
+                        }
+                        catch (Exception ex) when (onError != null)
+                        {
+                            onError(ex);
+                            throw;
+                        }
+
+                        yield return item;
+                    }
+
+                    onCompleted?.Invoke();
+                }
+            }
+#else
             return new DoAsyncIterator<TSource>(source, onNext, onError, onCompleted);
+#endif
         }
 
         private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task> onError, Func<Task> onCompleted)
         {
+#if USE_ASYNC_ITERATOR
+            return AsyncEnumerable.Create(Core);
+
+            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = source.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    while (true)
+                    {
+                        TSource item;
+
+                        try
+                        {
+                            if (!await e.MoveNextAsync())
+                            {
+                                break;
+                            }
+
+                            item = e.Current;
+
+                            await onNext(item).ConfigureAwait(false);
+                        }
+                        catch (OperationCanceledException)
+                        {
+                            throw;
+                        }
+                        catch (Exception ex) when (onError != null)
+                        {
+                            await onError(ex).ConfigureAwait(false);
+                            throw;
+                        }
+
+                        yield return item;
+                    }
+
+                    if (onCompleted != null)
+                    {
+                        await onCompleted().ConfigureAwait(false);
+                    }
+                }
+            }
+#else
             return new DoAsyncIteratorWithTask<TSource>(source, onNext, onError, onCompleted);
+#endif
         }
 
 #if !NO_DEEP_CANCELLATION
         private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task> onError, Func<CancellationToken, Task> onCompleted)
         {
+#if USE_ASYNC_ITERATOR
+            return AsyncEnumerable.Create(Core);
+
+            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+            {
+                await using (var e = source.GetAsyncEnumerator(cancellationToken).ConfigureAwait(false))
+                {
+                    while (true)
+                    {
+                        TSource item;
+
+                        try
+                        {
+                            if (!await e.MoveNextAsync())
+                            {
+                                break;
+                            }
+
+                            item = e.Current;
+
+                            await onNext(item, cancellationToken).ConfigureAwait(false);
+                        }
+                        catch (OperationCanceledException)
+                        {
+                            throw;
+                        }
+                        catch (Exception ex) when (onError != null)
+                        {
+                            await onError(ex, cancellationToken).ConfigureAwait(false);
+                            throw;
+                        }
+
+                        yield return item;
+                    }
+
+                    if (onCompleted != null)
+                    {
+                        await onCompleted(cancellationToken).ConfigureAwait(false);
+                    }
+                }
+            }
+#else
             return new DoAsyncIteratorWithTaskAndCancellation<TSource>(source, onNext, onError, onCompleted);
+#endif
         }
 #endif
 
+#if !USE_ASYNC_ITERATOR
         private sealed class DoAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly Action _onCompleted;
@@ -420,6 +547,7 @@ namespace System.Linq
                 return false;
             }
         }
+#endif
 #endif
     }
 }