AwaitTest.cs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Threading;
  16. using System.Reactive.Subjects;
  17. using System.Reactive.Disposables;
  18. namespace ReactiveTests.Tests
  19. {
  20. public class AwaitTest : ReactiveTest
  21. {
  22. [Fact]
  23. public void Await_ArgumentChecking()
  24. {
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IObservable<int>)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IConnectableObservable<int>)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter(Observable.Empty<int>()).OnCompleted(null));
  28. }
  29. [Fact]
  30. public void Await()
  31. {
  32. SynchronizationContext.SetSynchronizationContext(null);
  33. var scheduler = new TestScheduler();
  34. var xs = scheduler.CreateHotObservable(
  35. OnNext(20, -1),
  36. OnNext(150, 0),
  37. OnNext(220, 1),
  38. OnNext(290, 2),
  39. OnNext(340, 3),
  40. OnCompleted<int>(410)
  41. );
  42. var awaiter = default(AsyncSubject<int>);
  43. var result = default(int);
  44. var t = long.MaxValue;
  45. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  46. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  47. scheduler.Start();
  48. Assert.Equal(410, t);
  49. Assert.Equal(3, result);
  50. xs.Subscriptions.AssertEqual(
  51. Subscribe(100)
  52. );
  53. }
  54. [Fact]
  55. public void Await_Connectable()
  56. {
  57. SynchronizationContext.SetSynchronizationContext(null);
  58. var scheduler = new TestScheduler();
  59. var s = default(long);
  60. var xs = Observable.Create<int>(observer =>
  61. {
  62. s = scheduler.Clock;
  63. return StableCompositeDisposable.Create(
  64. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  65. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  66. );
  67. });
  68. var ys = xs.Publish();
  69. var awaiter = default(AsyncSubject<int>);
  70. var result = default(int);
  71. var t = long.MaxValue;
  72. scheduler.ScheduleAbsolute(100, () => awaiter = ys.GetAwaiter());
  73. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  74. scheduler.Start();
  75. Assert.Equal(100, s);
  76. Assert.Equal(260, t);
  77. Assert.Equal(42, result);
  78. }
  79. [Fact]
  80. public void Await_Error()
  81. {
  82. SynchronizationContext.SetSynchronizationContext(null);
  83. var scheduler = new TestScheduler();
  84. var ex = new Exception();
  85. var xs = scheduler.CreateHotObservable(
  86. OnNext(20, -1),
  87. OnNext(150, 0),
  88. OnNext(220, 1),
  89. OnNext(290, 2),
  90. OnNext(340, 3),
  91. OnError<int>(410, ex)
  92. );
  93. var awaiter = default(AsyncSubject<int>);
  94. var t = long.MaxValue;
  95. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  96. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws(ex, () => awaiter.GetResult()); }));
  97. scheduler.Start();
  98. Assert.Equal(410, t);
  99. xs.Subscriptions.AssertEqual(
  100. Subscribe(100)
  101. );
  102. }
  103. [Fact]
  104. public void Await_Never()
  105. {
  106. SynchronizationContext.SetSynchronizationContext(null);
  107. var scheduler = new TestScheduler();
  108. var xs = scheduler.CreateHotObservable(
  109. OnNext(20, -1),
  110. OnNext(150, 0),
  111. OnNext(220, 1),
  112. OnNext(290, 2),
  113. OnNext(340, 3)
  114. );
  115. var awaiter = default(AsyncSubject<int>);
  116. var hasValue = default(bool);
  117. var t = long.MaxValue;
  118. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  119. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; awaiter.GetResult(); hasValue = true; }));
  120. scheduler.Start();
  121. Assert.Equal(long.MaxValue, t);
  122. Assert.False(hasValue);
  123. xs.Subscriptions.AssertEqual(
  124. Subscribe(100)
  125. );
  126. }
  127. [Fact]
  128. public void Await_Empty()
  129. {
  130. SynchronizationContext.SetSynchronizationContext(null);
  131. var scheduler = new TestScheduler();
  132. var xs = scheduler.CreateHotObservable(
  133. OnCompleted<int>(300)
  134. );
  135. var awaiter = default(AsyncSubject<int>);
  136. var t = long.MaxValue;
  137. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  138. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws<InvalidOperationException>(() => awaiter.GetResult()); }));
  139. scheduler.Start();
  140. Assert.Equal(300, t);
  141. xs.Subscriptions.AssertEqual(
  142. Subscribe(100)
  143. );
  144. }
  145. }
  146. }