|
@@ -316,6 +316,124 @@ namespace Tests
|
|
Assert.Equal(ex1, ((AggregateException)ex_).InnerExceptions.Single());
|
|
Assert.Equal(ex1, ((AggregateException)ex_).InnerExceptions.Single());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ [Fact]
|
|
|
|
+ public void ToObservable_disposes_enumerator_on_completion()
|
|
|
|
+ {
|
|
|
|
+ var fail = false;
|
|
|
|
+ var evt = new ManualResetEvent(false);
|
|
|
|
+
|
|
|
|
+ var ae = AsyncEnumerable.CreateEnumerable(
|
|
|
|
+ () => AsyncEnumerable.CreateEnumerator<int>(
|
|
|
|
+ async ct => false,
|
|
|
|
+ () => { throw new InvalidOperationException(); },
|
|
|
|
+ () => { evt.Set(); }));
|
|
|
|
+
|
|
|
|
+ ae
|
|
|
|
+ .ToObservable()
|
|
|
|
+ .Subscribe(new MyObserver<int>(
|
|
|
|
+ x =>
|
|
|
|
+ {
|
|
|
|
+ fail = true;
|
|
|
|
+ },
|
|
|
|
+ ex =>
|
|
|
|
+ {
|
|
|
|
+ fail = true;
|
|
|
|
+ },
|
|
|
|
+ () =>
|
|
|
|
+ {
|
|
|
|
+ }
|
|
|
|
+ ));
|
|
|
|
+
|
|
|
|
+ evt.WaitOne();
|
|
|
|
+ Assert.False(fail);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ [Fact]
|
|
|
|
+ public void ToObservable_disposes_enumerator_when_subscription_is_disposed()
|
|
|
|
+ {
|
|
|
|
+ var fail = false;
|
|
|
|
+ var evt = new ManualResetEvent(false);
|
|
|
|
+ var subscription = default(IDisposable);
|
|
|
|
+ var subscriptionAssignedTcs = new TaskCompletionSource<object>();
|
|
|
|
+
|
|
|
|
+ var ae = AsyncEnumerable.CreateEnumerable(
|
|
|
|
+ () => AsyncEnumerable.CreateEnumerator(
|
|
|
|
+ async ct =>
|
|
|
|
+ {
|
|
|
|
+ await subscriptionAssignedTcs.Task;
|
|
|
|
+ return true;
|
|
|
|
+ },
|
|
|
|
+ () => 1,
|
|
|
|
+ () => { evt.Set(); }));
|
|
|
|
+
|
|
|
|
+ subscription = ae
|
|
|
|
+ .ToObservable()
|
|
|
|
+ .Subscribe(new MyObserver<int>(
|
|
|
|
+ x =>
|
|
|
|
+ {
|
|
|
|
+ subscription.Dispose();
|
|
|
|
+ },
|
|
|
|
+ ex =>
|
|
|
|
+ {
|
|
|
|
+ fail = true;
|
|
|
|
+ },
|
|
|
|
+ () =>
|
|
|
|
+ {
|
|
|
|
+ fail = true;
|
|
|
|
+ }
|
|
|
|
+ ));
|
|
|
|
+
|
|
|
|
+ subscriptionAssignedTcs.SetResult(null);
|
|
|
|
+ evt.WaitOne();
|
|
|
|
+
|
|
|
|
+ Assert.False(fail);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ [Fact]
|
|
|
|
+ public void ToObservable_does_not_call_MoveNext_again_when_subscription_is_disposed()
|
|
|
|
+ {
|
|
|
|
+ var fail = false;
|
|
|
|
+ var moveNextCount = 0;
|
|
|
|
+ var evt = new ManualResetEvent(false);
|
|
|
|
+ var subscription = default(IDisposable);
|
|
|
|
+ var subscriptionAssignedTcs = new TaskCompletionSource<object>();
|
|
|
|
+
|
|
|
|
+ var ae = AsyncEnumerable.CreateEnumerable(
|
|
|
|
+ () => AsyncEnumerable.CreateEnumerator(
|
|
|
|
+ async ct =>
|
|
|
|
+ {
|
|
|
|
+ await subscriptionAssignedTcs.Task;
|
|
|
|
+
|
|
|
|
+ moveNextCount++;
|
|
|
|
+ return true;
|
|
|
|
+ },
|
|
|
|
+ () => 1,
|
|
|
|
+ () => { evt.Set(); }));
|
|
|
|
+
|
|
|
|
+ subscription = ae
|
|
|
|
+ .ToObservable()
|
|
|
|
+ .Subscribe(new MyObserver<int>(
|
|
|
|
+ x =>
|
|
|
|
+ {
|
|
|
|
+ subscription.Dispose();
|
|
|
|
+ },
|
|
|
|
+ ex =>
|
|
|
|
+ {
|
|
|
|
+ fail = true;
|
|
|
|
+ },
|
|
|
|
+ () =>
|
|
|
|
+ {
|
|
|
|
+ fail = true;
|
|
|
|
+ }
|
|
|
|
+ ));
|
|
|
|
+
|
|
|
|
+ subscriptionAssignedTcs.SetResult(null);
|
|
|
|
+ evt.WaitOne();
|
|
|
|
+
|
|
|
|
+ Assert.Equal(1, moveNextCount);
|
|
|
|
+ Assert.False(fail);
|
|
|
|
+ }
|
|
|
|
+
|
|
class MyObserver<T> : IObserver<T>
|
|
class MyObserver<T> : IObserver<T>
|
|
{
|
|
{
|
|
private Action<T> _onNext;
|
|
private Action<T> _onNext;
|