Browse Source

Use CreateEnumerable overloads that take in a CancellationToken.

Bart De Smet 7 years ago
parent
commit
0a64f5bcb6

+ 1 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Never.cs

@@ -18,7 +18,7 @@ namespace System.Linq
             //         seem correct either, given that we return a *cold* sequence.
             //
 
-            return AsyncEnumerable.CreateEnumerable(() => AsyncEnumerable.CreateEnumerator<TValue>(() => TaskExt.Never, current: null, dispose: null));
+            return AsyncEnumerable.CreateEnumerable(_ => AsyncEnumerable.CreateEnumerator<TValue>(() => TaskExt.Never, current: null, dispose: null));
         }
     }
 }

+ 5 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Throw.cs

@@ -22,8 +22,12 @@ namespace System.Linq
             var moveNextThrows = new ValueTask<bool>(Task.FromException<bool>(exception));
 #endif
 
+            //
+            // REVIEW: Honor cancellation using conditional expression in MoveNextAsync?
+            //
+
             return AsyncEnumerable.CreateEnumerable(
-                () => AsyncEnumerable.CreateEnumerator<TValue>(
+                _ => AsyncEnumerable.CreateEnumerator<TValue>(
                     () => moveNextThrows,
                     current: null,
                     dispose: null)

+ 1 - 1
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/AsyncEnumerableTests.cs

@@ -94,7 +94,7 @@ namespace Tests
 #endif
 
             return AsyncEnumerable.CreateEnumerable(
-                () => AsyncEnumerable.CreateEnumerator<TValue>(
+                _ => AsyncEnumerable.CreateEnumerator<TValue>(
                     () => moveNextThrows,
                     current: null,
                     dispose: null)

+ 2 - 0
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/CreateEnumerable.cs

@@ -5,6 +5,7 @@
 using System;
 using System.Collections.Generic;
 using System.Linq;
+using System.Threading;
 using Xunit;
 
 namespace Tests
@@ -15,6 +16,7 @@ namespace Tests
         public void CreateEnumerable_Null()
         {
             AssertThrows<ArgumentNullException>(() => AsyncEnumerable.CreateEnumerable<int>(default(Func<IAsyncEnumerator<int>>)));
+            AssertThrows<ArgumentNullException>(() => AsyncEnumerable.CreateEnumerable<int>(default(Func<CancellationToken, IAsyncEnumerator<int>>)));
         }
     }
 }

+ 3 - 3
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs

@@ -142,7 +142,7 @@ namespace Tests
             var evt = new ManualResetEvent(false);
 
             var ae = AsyncEnumerable.CreateEnumerable(
-                () => AsyncEnumerable.CreateEnumerator<int>(
+                _ => AsyncEnumerable.CreateEnumerator<int>(
                     () => TaskExt.False,
                     () => { throw new InvalidOperationException(); },
                     () => { evt.Set(); return TaskExt.CompletedTask; }));
@@ -176,7 +176,7 @@ namespace Tests
             var subscriptionAssignedTcs = new TaskCompletionSource<object>();
 
             var ae = AsyncEnumerable.CreateEnumerable(
-                () => AsyncEnumerable.CreateEnumerator(
+                _ => AsyncEnumerable.CreateEnumerator(
                     async () =>
                     {
                         await subscriptionAssignedTcs.Task;
@@ -222,7 +222,7 @@ namespace Tests
             var subscriptionAssignedTcs = new TaskCompletionSource<object>();
 
             var ae = AsyncEnumerable.CreateEnumerable(
-                () => AsyncEnumerable.CreateEnumerator(
+                _ => AsyncEnumerable.CreateEnumerator(
                     async () =>
                     {
                         await subscriptionAssignedTcs.Task;

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerable.cs

@@ -31,7 +31,7 @@ namespace System.Linq
 #endif
 
             return CreateEnumerable(
-                () => CreateEnumerator<TValue>(
+                _ => CreateEnumerator<TValue>(
                     () => moveNextThrows,
                     current: null,
                     dispose: null)

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerator.cs

@@ -87,7 +87,7 @@ namespace System.Collections.Generic
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            return AsyncEnumerable.CreateEnumerable<T>(() => source);
+            return AsyncEnumerable.CreateEnumerable<T>(_ => source);
         }
 
         internal static IAsyncEnumerator<T> Create<T>(Func<TaskCompletionSource<bool>, ValueTask<bool>> moveNext, Func<T> current, Func<ValueTask> dispose)

+ 7 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.cs

@@ -33,7 +33,7 @@ namespace System.Linq
                 throw new ArgumentNullException(nameof(task));
 
             return CreateEnumerable(
-                () =>
+                _ =>
                 {
                     var called = 0;
 
@@ -59,12 +59,16 @@ namespace System.Linq
                 throw new ArgumentNullException(nameof(source));
 
             return CreateEnumerable(
-                () =>
+                ct =>
                 {
                     var observer = new ToAsyncEnumerableObserver<TSource>();
 
                     var subscription = source.Subscribe(observer);
 
+                    // REVIEW: Review possible concurrency issues with Dispose calls.
+
+                    var ctr = ct.Register(subscription.Dispose);
+
                     return CreateEnumerator(
                         tcs =>
                         {
@@ -111,6 +115,7 @@ namespace System.Linq
                         () => observer.Current,
                         () =>
                         {
+                            ctr.Dispose();
                             subscription.Dispose();
                             // Should we cancel in-flight operations somehow?
                             return TaskExt.CompletedTask;