WhenTest.cs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. using System.Reactive.Subjects;
  19. using System.Reactive.Joins;
  20. namespace ReactiveTests.Tests
  21. {
  22. public class WhenTest : ReactiveTest
  23. {
  24. [Fact]
  25. public void When_ArgumentChecking()
  26. {
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.When<int>((Plan<int>[])null));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.When<int>((IEnumerable<Plan<int>>)null));
  29. }
  30. [Fact]
  31. public void WhenMultipleDataSymmetric()
  32. {
  33. var scheduler = new TestScheduler();
  34. var xs = scheduler.CreateHotObservable(
  35. OnNext(210, 1),
  36. OnNext(220, 2),
  37. OnNext(230, 3),
  38. OnCompleted<int>(240)
  39. );
  40. var ys = scheduler.CreateHotObservable(
  41. OnNext(240, 4),
  42. OnNext(250, 5),
  43. OnNext(260, 6),
  44. OnCompleted<int>(270)
  45. );
  46. var res = scheduler.Start(() =>
  47. Observable.When(
  48. xs.And(ys).Then((x, y) => x + y)
  49. )
  50. );
  51. res.Messages.AssertEqual(
  52. OnNext(240, 1 + 4),
  53. OnNext(250, 2 + 5),
  54. OnNext(260, 3 + 6),
  55. OnCompleted<int>(270)
  56. );
  57. }
  58. [Fact]
  59. public void WhenMultipleDataAsymmetric()
  60. {
  61. var scheduler = new TestScheduler();
  62. var xs = scheduler.CreateHotObservable(
  63. OnNext(210, 1),
  64. OnNext(220, 2),
  65. OnNext(230, 3),
  66. OnCompleted<int>(240)
  67. );
  68. var ys = scheduler.CreateHotObservable(
  69. OnNext(240, 4),
  70. OnNext(250, 5),
  71. OnCompleted<int>(270)
  72. );
  73. var res = scheduler.Start(() =>
  74. Observable.When(
  75. xs.And(ys).Then((x, y) => x + y)
  76. )
  77. );
  78. res.Messages.AssertEqual(
  79. OnNext(240, 1 + 4),
  80. OnNext(250, 2 + 5),
  81. OnCompleted<int>(270)
  82. );
  83. }
  84. [Fact]
  85. public void WhenEmptyEmpty()
  86. {
  87. var scheduler = new TestScheduler();
  88. var xs = scheduler.CreateHotObservable(
  89. OnCompleted<int>(240)
  90. );
  91. var ys = scheduler.CreateHotObservable(
  92. OnCompleted<int>(270)
  93. );
  94. var res = scheduler.Start(() =>
  95. Observable.When(
  96. xs.And(ys).Then((x, y) => x + y)
  97. )
  98. );
  99. res.Messages.AssertEqual(
  100. OnCompleted<int>(270)
  101. );
  102. }
  103. [Fact]
  104. public void WhenNeverNever()
  105. {
  106. var scheduler = new TestScheduler();
  107. var xs = Observable.Never<int>();
  108. var ys = Observable.Never<int>().AsObservable();
  109. var res = scheduler.Start(() =>
  110. Observable.When(
  111. xs.And(ys).Then((x, y) => x + y)
  112. )
  113. );
  114. res.Messages.AssertEqual(
  115. );
  116. }
  117. [Fact]
  118. public void WhenThrowNonEmpty()
  119. {
  120. var ex = new Exception();
  121. var scheduler = new TestScheduler();
  122. var xs = scheduler.CreateHotObservable(
  123. OnError<int>(240, ex)
  124. );
  125. var ys = scheduler.CreateHotObservable(
  126. OnCompleted<int>(270)
  127. );
  128. var res = scheduler.Start(() =>
  129. Observable.When(
  130. xs.And(ys).Then((x, y) => x + y)
  131. )
  132. );
  133. res.Messages.AssertEqual(
  134. OnError<int>(240, ex)
  135. );
  136. }
  137. [Fact]
  138. public void ComplicatedWhen()
  139. {
  140. var scheduler = new TestScheduler();
  141. var xs = scheduler.CreateHotObservable(
  142. OnNext(210, 1),
  143. OnNext(220, 2),
  144. OnNext(230, 3),
  145. OnCompleted<int>(240)
  146. );
  147. var ys = scheduler.CreateHotObservable(
  148. OnNext(240, 4),
  149. OnNext(250, 5),
  150. OnNext(260, 6),
  151. OnCompleted<int>(270)
  152. );
  153. var zs = scheduler.CreateHotObservable(
  154. OnNext(220, 7),
  155. OnNext(230, 8),
  156. OnNext(240, 9),
  157. OnCompleted<int>(300)
  158. );
  159. var res = scheduler.Start(() =>
  160. Observable.When(
  161. xs.And(ys).Then((x, y) => x + y),
  162. xs.And(zs).Then((x, z) => x * z),
  163. ys.And(zs).Then((y, z) => y - z)
  164. )
  165. );
  166. res.Messages.AssertEqual(
  167. OnNext(220, 1 * 7),
  168. OnNext(230, 2 * 8),
  169. OnNext(240, 3 + 4),
  170. OnNext(250, 5 - 9),
  171. OnCompleted<int>(300)
  172. );
  173. }
  174. [Fact]
  175. public void When_PlansIteratorThrows()
  176. {
  177. var ex = new Exception();
  178. var _e = default(Exception);
  179. GetPlans(ex).When().Subscribe(_ => { }, e => { _e = e; });
  180. Assert.Same(_e, ex);
  181. }
  182. private IEnumerable<Plan<int>> GetPlans(Exception ex)
  183. {
  184. if (ex != null)
  185. throw ex;
  186. yield break;
  187. }
  188. [Fact]
  189. public void SameSource()
  190. {
  191. var source = Observable.Range(1, 5);
  192. var list = Observable.When(source.And(source).Then((a, b) => a + b))
  193. .ToList().First();
  194. Assert.Equal(new List<int>() { 2, 4, 6, 8, 10 }, list);
  195. }
  196. }
  197. }