Prechádzať zdrojové kódy

Adding a few tests for await support and RunAsync.

Bart De Smet 10 rokov pred
rodič
commit
e6d7a65140

+ 5 - 5
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.Awaiter.cs

@@ -26,7 +26,7 @@ namespace System.Reactive.Linq
 
             if (cancellationToken.IsCancellationRequested)
             {
-                return Cancel(s);
+                return Cancel(s, cancellationToken);
             }
 
             var d = source.SubscribeSafe(s);
@@ -45,7 +45,7 @@ namespace System.Reactive.Linq
 
             if (cancellationToken.IsCancellationRequested)
             {
-                return Cancel(s);
+                return Cancel(s, cancellationToken);
             }
 
             var d = source.SubscribeSafe(s);
@@ -59,9 +59,9 @@ namespace System.Reactive.Linq
             return s;
         }
 
-        private static AsyncSubject<T> Cancel<T>(AsyncSubject<T> subject)
+        private static AsyncSubject<T> Cancel<T>(AsyncSubject<T> subject, CancellationToken cancellationToken)
         {
-            subject.OnError(new OperationCanceledException());
+            subject.OnError(new OperationCanceledException(cancellationToken));
             return subject;
         }
 
@@ -75,7 +75,7 @@ namespace System.Reactive.Linq
             var ctr = token.Register(() =>
             {
                 subscription.Dispose();
-                Cancel(subject);
+                Cancel(subject, token);
             });
 
             //

+ 269 - 1
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableAwaiterTest.cs

@@ -8,9 +8,11 @@ using System.Reactive;
 using System.Reactive.Concurrency;
 using System.Reactive.Linq;
 using System.Reactive.Subjects;
+using System.Threading;
 using Microsoft.Reactive.Testing;
 using Microsoft.VisualStudio.TestTools.UnitTesting;
 using ReactiveTests.Dummies;
+using System.Reactive.Disposables;
 
 namespace ReactiveTests.Tests
 {
@@ -20,7 +22,9 @@ namespace ReactiveTests.Tests
         [TestMethod]
         public void Await_ArgumentChecking()
         {
-            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IObservable<int>)));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IConnectableObservable<int>)));
+
             ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter(Observable.Empty<int>()).OnCompleted(null));
         }
 
@@ -55,6 +59,39 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [TestMethod]
+        public void Await_Connectable()
+        {
+            var scheduler = new TestScheduler();
+
+            var s = default(long);
+
+            var xs = Observable.Create<int>(observer =>
+            {
+                s = scheduler.Clock;
+
+                return StableCompositeDisposable.Create(
+                    scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
+                    scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
+                );
+            });
+
+            var ys = xs.Publish();
+
+            var awaiter = default(AsyncSubject<int>);
+            var result = default(int);
+            var t = long.MaxValue;
+
+            scheduler.ScheduleAbsolute(100, () => awaiter = ys.GetAwaiter());
+            scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
+
+            scheduler.Start();
+
+            Assert.AreEqual(100, s);
+            Assert.AreEqual(260, t);
+            Assert.AreEqual(42, result);
+        }
+
         [TestMethod]
         public void Await_Error()
         {
@@ -139,6 +176,237 @@ namespace ReactiveTests.Tests
                 Subscribe(100)
             );
         }
+
+        [TestMethod]
+        public void RunAsync_ArgumentChecking()
+        {
+            var ct = CancellationToken.None;
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IObservable<int>), ct));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IConnectableObservable<int>), ct));
+        }
+
+        [TestMethod]
+        public void RunAsync_Simple()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(220, 42),
+                OnCompleted<int>(250)
+            );
+
+            var awaiter = default(AsyncSubject<int>);
+            var result = default(int);
+            var t = long.MaxValue;
+
+            scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(CancellationToken.None));
+            scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
+
+            scheduler.Start();
+
+            Assert.AreEqual(250, t);
+            Assert.AreEqual(42, result);
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(100)
+            );
+        }
+
+        [TestMethod]
+        public void RunAsync_Cancelled()
+        {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(220, 42),
+                OnCompleted<int>(250)
+            );
+
+            var awaiter = default(AsyncSubject<int>);
+            var result = default(int);
+            var t = long.MaxValue;
+
+            scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
+            scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
+            {
+                t = scheduler.Clock;
+
+                ReactiveAssert.Throws<OperationCanceledException>(() =>
+                {
+                    result = awaiter.GetResult();
+                });
+            }));
+
+            scheduler.Start();
+
+            Assert.AreEqual(200, t);
+
+            xs.Subscriptions.AssertEqual(
+            );
+        }
+
+        [TestMethod]
+        public void RunAsync_Cancel()
+        {
+            var cts = new CancellationTokenSource();
+
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(220, 42),
+                OnCompleted<int>(250)
+            );
+
+            var awaiter = default(AsyncSubject<int>);
+            var result = default(int);
+            var t = long.MaxValue;
+
+            scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
+            scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
+            {
+                t = scheduler.Clock;
+
+                ReactiveAssert.Throws<OperationCanceledException>(() =>
+                {
+                    result = awaiter.GetResult();
+                });
+            }));
+            scheduler.ScheduleAbsolute(210, () => cts.Cancel());
+
+            scheduler.Start();
+
+            Assert.AreEqual(210, t);
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(100, 210)
+            );
+        }
+
+        [TestMethod]
+        public void RunAsync_Connectable()
+        {
+            var scheduler = new TestScheduler();
+
+            var s = default(long);
+
+            var xs = Observable.Create<int>(observer =>
+            {
+                s = scheduler.Clock;
+
+                return StableCompositeDisposable.Create(
+                    scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
+                    scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
+                );
+            });
+
+            var ys = xs.Publish();
+
+            var awaiter = default(AsyncSubject<int>);
+            var result = default(int);
+            var t = long.MaxValue;
+
+            scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(CancellationToken.None));
+            scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
+
+            scheduler.Start();
+
+            Assert.AreEqual(100, s);
+            Assert.AreEqual(260, t);
+            Assert.AreEqual(42, result);
+        }
+
+        [TestMethod]
+        public void RunAsync_Connectable_Cancelled()
+        {
+            var cts = new CancellationTokenSource();
+            cts.Cancel();
+
+            var scheduler = new TestScheduler();
+
+            var s = default(long?);
+
+            var xs = Observable.Create<int>(observer =>
+            {
+                s = scheduler.Clock;
+
+                return StableCompositeDisposable.Create(
+                    scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
+                    scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
+                );
+            });
+
+            var ys = xs.Publish();
+
+            var awaiter = default(AsyncSubject<int>);
+            var result = default(int);
+            var t = long.MaxValue;
+
+            scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
+            scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
+            {
+                t = scheduler.Clock;
+
+                ReactiveAssert.Throws<OperationCanceledException>(() =>
+                {
+                    result = awaiter.GetResult();
+                });
+            }));
+
+            scheduler.Start();
+
+            Assert.IsFalse(s.HasValue);
+            Assert.AreEqual(200, t);
+        }
+
+        [TestMethod]
+        public void RunAsync_Connectable_Cancel()
+        {
+            var cts = new CancellationTokenSource();
+
+            var scheduler = new TestScheduler();
+
+            var s = default(long);
+            var d = default(long);
+
+            var xs = Observable.Create<int>(observer =>
+            {
+                s = scheduler.Clock;
+
+                return StableCompositeDisposable.Create(
+                    scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
+                    scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); }),
+                    Disposable.Create(() => { d = scheduler.Clock; })
+                );
+            });
+
+            var ys = xs.Publish();
+
+            var awaiter = default(AsyncSubject<int>);
+            var result = default(int);
+            var t = long.MaxValue;
+
+            scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
+            scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
+            {
+                t = scheduler.Clock;
+
+                ReactiveAssert.Throws<OperationCanceledException>(() =>
+                {
+                    result = awaiter.GetResult();
+                });
+            }));
+            scheduler.ScheduleAbsolute(210, () => cts.Cancel());
+
+            scheduler.Start();
+
+            Assert.AreEqual(100, s);
+            Assert.AreEqual(210, d);
+            Assert.AreEqual(210, t);
+        }
     }
 }