ObservableSafetyTest.cs 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Reactive;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Linq;
  6. using System.Threading;
  7. using Microsoft.Reactive.Testing;
  8. using Microsoft.VisualStudio.TestTools.UnitTesting;
  9. namespace ReactiveTests.Tests
  10. {
  11. [TestClass]
  12. public partial class ObservableSafetyTest : ReactiveTest
  13. {
  14. [TestMethod]
  15. public void SubscribeSafe_ArgumentChecking()
  16. {
  17. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.SubscribeSafe<int>(default(IObservable<int>), Observer.Create<int>(_ => { })));
  18. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.SubscribeSafe<int>(Observable.Return(42), default(IObserver<int>)));
  19. }
  20. [TestMethod]
  21. public void Safety_Subscription1()
  22. {
  23. var ex = new Exception();
  24. var xs = new RogueObservable(ex);
  25. var res = xs.Where(x => true).Select(x => x);
  26. var err = default(Exception);
  27. var d = res.Subscribe(x => { Assert.Fail(); }, ex_ => { err = ex_; }, () => { Assert.Fail(); });
  28. Assert.AreSame(ex, err);
  29. d.Dispose();
  30. }
  31. [TestMethod]
  32. public void Safety_Subscription2()
  33. {
  34. var ex = new Exception();
  35. var scheduler = new TestScheduler();
  36. var xs = scheduler.CreateHotObservable<int>(
  37. OnNext(210, 42),
  38. OnNext(220, 43),
  39. OnNext(230, 44),
  40. OnNext(240, 45),
  41. OnCompleted<int>(250)
  42. );
  43. var ys = new RogueObservable(ex);
  44. var res = scheduler.Start(() =>
  45. xs.Merge(ys)
  46. );
  47. res.Messages.AssertEqual(
  48. OnError<int>(200, ex)
  49. );
  50. xs.Subscriptions.AssertEqual(
  51. Subscribe(200, 200)
  52. );
  53. }
  54. class RogueObservable : IObservable<int>
  55. {
  56. private readonly Exception _ex;
  57. public RogueObservable(Exception ex)
  58. {
  59. _ex = ex;
  60. }
  61. public IDisposable Subscribe(IObserver<int> observer)
  62. {
  63. throw _ex;
  64. }
  65. }
  66. [TestMethod]
  67. public void ObservableBase_ObserverThrows()
  68. {
  69. var ex = new Exception();
  70. var failed = new ManualResetEvent(false);
  71. var disposed = new ManualResetEvent(false);
  72. var err = default(Exception);
  73. var xs = Observable.Create<int>(observer =>
  74. {
  75. Scheduler.Default.Schedule(() =>
  76. {
  77. try
  78. {
  79. observer.OnNext(42);
  80. }
  81. catch (Exception ex_)
  82. {
  83. err = ex_;
  84. failed.Set();
  85. }
  86. });
  87. return () => { disposed.Set(); };
  88. });
  89. xs.Subscribe(x =>
  90. {
  91. throw ex;
  92. });
  93. // Can't use WaitAll - we're on an STA thread.
  94. disposed.WaitOne();
  95. failed.WaitOne();
  96. Assert.AreSame(ex, err);
  97. }
  98. [TestMethod]
  99. public void ObservableBase_ObserverThrows_CustomObserver()
  100. {
  101. var ex = new Exception();
  102. var failed = new ManualResetEvent(false);
  103. var disposed = new ManualResetEvent(false);
  104. var err = default(Exception);
  105. var xs = Observable.Create<int>(observer =>
  106. {
  107. Scheduler.Default.Schedule(() =>
  108. {
  109. try
  110. {
  111. observer.OnNext(42);
  112. }
  113. catch (Exception ex_)
  114. {
  115. err = ex_;
  116. failed.Set();
  117. }
  118. });
  119. return () => { disposed.Set(); };
  120. });
  121. xs.Subscribe(new MyObserver(_ => true, ex));
  122. // Can't use WaitAll - we're on an STA thread.
  123. disposed.WaitOne();
  124. failed.WaitOne();
  125. Assert.AreSame(ex, err);
  126. }
  127. [TestMethod]
  128. public void Producer_ObserverThrows()
  129. {
  130. var ex = new Exception();
  131. var scheduler = new TestScheduler();
  132. var xs = scheduler.CreateHotObservable<int>(
  133. OnNext(210, 1),
  134. OnNext(220, 2),
  135. OnNext(230, 3)
  136. );
  137. var ys = scheduler.CreateHotObservable<int>(
  138. OnNext(215, 1),
  139. OnNext(225, 2),
  140. OnNext(235, 3)
  141. );
  142. var res = xs.CombineLatest(ys, (x, y) => x + y); // This creates a Producer object
  143. scheduler.ScheduleAbsolute(200, () =>
  144. {
  145. res.Subscribe(z =>
  146. {
  147. if (z == 4)
  148. throw ex;
  149. });
  150. });
  151. try
  152. {
  153. scheduler.Start();
  154. Assert.Fail();
  155. }
  156. catch (Exception err)
  157. {
  158. Assert.AreSame(ex, err);
  159. }
  160. Assert.AreEqual(225, scheduler.Clock);
  161. xs.Subscriptions.AssertEqual(
  162. Subscribe(200, 225)
  163. );
  164. ys.Subscriptions.AssertEqual(
  165. Subscribe(200, 225)
  166. );
  167. }
  168. [TestMethod]
  169. public void Producer_ObserverThrows_CustomObserver()
  170. {
  171. var ex = new Exception();
  172. var scheduler = new TestScheduler();
  173. var xs = scheduler.CreateHotObservable<int>(
  174. OnNext(210, 1),
  175. OnNext(220, 2),
  176. OnNext(230, 3)
  177. );
  178. var ys = scheduler.CreateHotObservable<int>(
  179. OnNext(215, 1),
  180. OnNext(225, 2),
  181. OnNext(235, 3)
  182. );
  183. var res = xs.CombineLatest(ys, (x, y) => x + y); // This creates a Producer object
  184. scheduler.ScheduleAbsolute(200, () =>
  185. {
  186. res.Subscribe(new MyObserver(x => x == 4, ex));
  187. });
  188. try
  189. {
  190. scheduler.Start();
  191. Assert.Fail();
  192. }
  193. catch (Exception err)
  194. {
  195. Assert.AreSame(ex, err);
  196. }
  197. Assert.AreEqual(225, scheduler.Clock);
  198. xs.Subscriptions.AssertEqual(
  199. Subscribe(200, 225)
  200. );
  201. ys.Subscriptions.AssertEqual(
  202. Subscribe(200, 225)
  203. );
  204. }
  205. class MyObserver : ObserverBase<int>
  206. {
  207. private readonly Func<int, bool> _predicate;
  208. private readonly Exception _exception;
  209. public MyObserver(Func<int, bool> predicate, Exception exception)
  210. {
  211. _predicate = predicate;
  212. _exception = exception;
  213. }
  214. protected override void OnNextCore(int value)
  215. {
  216. if (_predicate(value))
  217. throw _exception;
  218. }
  219. protected override void OnErrorCore(Exception error)
  220. {
  221. }
  222. protected override void OnCompletedCore()
  223. {
  224. }
  225. }
  226. }
  227. }