AwaitTest.cs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. namespace ReactiveTests.Tests
  13. {
  14. public class AwaitTest : ReactiveTest
  15. {
  16. [Fact]
  17. public void Await_ArgumentChecking()
  18. {
  19. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter(default(IObservable<int>)));
  20. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter(Observable.Empty<int>()).OnCompleted(null));
  22. }
  23. [Fact]
  24. public void Await()
  25. {
  26. SynchronizationContext.SetSynchronizationContext(null);
  27. var scheduler = new TestScheduler();
  28. var xs = scheduler.CreateHotObservable(
  29. OnNext(20, -1),
  30. OnNext(150, 0),
  31. OnNext(220, 1),
  32. OnNext(290, 2),
  33. OnNext(340, 3),
  34. OnCompleted<int>(410)
  35. );
  36. var awaiter = default(AsyncSubject<int>);
  37. var result = default(int);
  38. var t = long.MaxValue;
  39. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  40. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  41. scheduler.Start();
  42. Assert.Equal(410, t);
  43. Assert.Equal(3, result);
  44. xs.Subscriptions.AssertEqual(
  45. Subscribe(100)
  46. );
  47. }
  48. [Fact]
  49. public void Await_Connectable()
  50. {
  51. SynchronizationContext.SetSynchronizationContext(null);
  52. var scheduler = new TestScheduler();
  53. var s = default(long);
  54. var xs = Observable.Create<int>(observer =>
  55. {
  56. s = scheduler.Clock;
  57. return StableCompositeDisposable.Create(
  58. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  59. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  60. );
  61. });
  62. var ys = xs.Publish();
  63. var awaiter = default(AsyncSubject<int>);
  64. var result = default(int);
  65. var t = long.MaxValue;
  66. scheduler.ScheduleAbsolute(100, () => awaiter = ys.GetAwaiter());
  67. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  68. scheduler.Start();
  69. Assert.Equal(100, s);
  70. Assert.Equal(260, t);
  71. Assert.Equal(42, result);
  72. }
  73. [Fact]
  74. public void Await_Error()
  75. {
  76. SynchronizationContext.SetSynchronizationContext(null);
  77. var scheduler = new TestScheduler();
  78. var ex = new Exception();
  79. var xs = scheduler.CreateHotObservable(
  80. OnNext(20, -1),
  81. OnNext(150, 0),
  82. OnNext(220, 1),
  83. OnNext(290, 2),
  84. OnNext(340, 3),
  85. OnError<int>(410, ex)
  86. );
  87. var awaiter = default(AsyncSubject<int>);
  88. var t = long.MaxValue;
  89. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  90. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws(ex, () => awaiter.GetResult()); }));
  91. scheduler.Start();
  92. Assert.Equal(410, t);
  93. xs.Subscriptions.AssertEqual(
  94. Subscribe(100)
  95. );
  96. }
  97. [Fact]
  98. public void Await_Never()
  99. {
  100. SynchronizationContext.SetSynchronizationContext(null);
  101. var scheduler = new TestScheduler();
  102. var xs = scheduler.CreateHotObservable(
  103. OnNext(20, -1),
  104. OnNext(150, 0),
  105. OnNext(220, 1),
  106. OnNext(290, 2),
  107. OnNext(340, 3)
  108. );
  109. var awaiter = default(AsyncSubject<int>);
  110. var hasValue = default(bool);
  111. var t = long.MaxValue;
  112. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  113. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; awaiter.GetResult(); hasValue = true; }));
  114. scheduler.Start();
  115. Assert.Equal(long.MaxValue, t);
  116. Assert.False(hasValue);
  117. xs.Subscriptions.AssertEqual(
  118. Subscribe(100)
  119. );
  120. }
  121. [Fact]
  122. public void Await_Empty()
  123. {
  124. SynchronizationContext.SetSynchronizationContext(null);
  125. var scheduler = new TestScheduler();
  126. var xs = scheduler.CreateHotObservable(
  127. OnCompleted<int>(300)
  128. );
  129. var awaiter = default(AsyncSubject<int>);
  130. var t = long.MaxValue;
  131. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  132. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws<InvalidOperationException>(() => awaiter.GetResult()); }));
  133. scheduler.Start();
  134. Assert.Equal(300, t);
  135. xs.Subscriptions.AssertEqual(
  136. Subscribe(100)
  137. );
  138. }
  139. }
  140. }