ObservableSafetyTest.cs 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Threading;
  9. using Microsoft.Reactive.Testing;
  10. using Microsoft.VisualStudio.TestTools.UnitTesting;
  11. using Assert = Xunit.Assert;
  12. namespace ReactiveTests.Tests
  13. {
  14. [TestClass]
  15. public partial class ObservableSafetyTest : ReactiveTest
  16. {
  17. [TestMethod]
  18. public void SubscribeSafe_ArgumentChecking()
  19. {
  20. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.SubscribeSafe(default, Observer.Create<int>(_ => { })));
  21. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.SubscribeSafe(Observable.Return(42), default));
  22. }
  23. [TestMethod]
  24. public void Safety_Subscription1()
  25. {
  26. var ex = new Exception();
  27. var xs = new RogueObservable(ex);
  28. var res = xs.Where(x => true).Select(x => x);
  29. var err = default(Exception);
  30. var d = res.Subscribe(x => { Assert.True(false); }, ex_ => { err = ex_; }, () => { Assert.True(false); });
  31. Assert.Same(ex, err);
  32. d.Dispose();
  33. }
  34. [TestMethod]
  35. public void Safety_Subscription2()
  36. {
  37. var ex = new Exception();
  38. var scheduler = new TestScheduler();
  39. var xs = scheduler.CreateHotObservable(
  40. OnNext(210, 42),
  41. OnNext(220, 43),
  42. OnNext(230, 44),
  43. OnNext(240, 45),
  44. OnCompleted<int>(250)
  45. );
  46. var ys = new RogueObservable(ex);
  47. var res = scheduler.Start(() =>
  48. xs.Merge(ys)
  49. );
  50. res.Messages.AssertEqual(
  51. OnError<int>(200, ex)
  52. );
  53. xs.Subscriptions.AssertEqual(
  54. Subscribe(200, 200)
  55. );
  56. }
  57. private class RogueObservable : IObservable<int>
  58. {
  59. private readonly Exception _ex;
  60. public RogueObservable(Exception ex)
  61. {
  62. _ex = ex;
  63. }
  64. public IDisposable Subscribe(IObserver<int> observer)
  65. {
  66. throw _ex;
  67. }
  68. }
  69. [TestMethod]
  70. public void ObservableBase_ObserverThrows()
  71. {
  72. var ex = new Exception();
  73. var failed = new ManualResetEvent(false);
  74. var disposed = new ManualResetEvent(false);
  75. var err = default(Exception);
  76. var xs = Observable.Create<int>(observer =>
  77. {
  78. Scheduler.Default.Schedule(() =>
  79. {
  80. try
  81. {
  82. observer.OnNext(42);
  83. }
  84. catch (Exception ex_)
  85. {
  86. err = ex_;
  87. failed.Set();
  88. }
  89. });
  90. return () => { disposed.Set(); };
  91. });
  92. xs.Subscribe(x =>
  93. {
  94. throw ex;
  95. });
  96. // Can't use WaitAll - we're on an STA thread.
  97. disposed.WaitOne();
  98. failed.WaitOne();
  99. Assert.Same(ex, err);
  100. }
  101. [TestMethod]
  102. public void ObservableBase_ObserverThrows_CustomObserver()
  103. {
  104. var ex = new Exception();
  105. var failed = new ManualResetEvent(false);
  106. var disposed = new ManualResetEvent(false);
  107. var err = default(Exception);
  108. var xs = Observable.Create<int>(observer =>
  109. {
  110. Scheduler.Default.Schedule(() =>
  111. {
  112. try
  113. {
  114. observer.OnNext(42);
  115. }
  116. catch (Exception ex_)
  117. {
  118. err = ex_;
  119. failed.Set();
  120. }
  121. });
  122. return () => { disposed.Set(); };
  123. });
  124. xs.Subscribe(new MyObserver(_ => true, ex));
  125. // Can't use WaitAll - we're on an STA thread.
  126. disposed.WaitOne();
  127. failed.WaitOne();
  128. Assert.Same(ex, err);
  129. }
  130. [TestMethod]
  131. public void Producer_ObserverThrows()
  132. {
  133. var ex = new Exception();
  134. var scheduler = new TestScheduler();
  135. var xs = scheduler.CreateHotObservable(
  136. OnNext(210, 1),
  137. OnNext(220, 2),
  138. OnNext(230, 3)
  139. );
  140. var ys = scheduler.CreateHotObservable(
  141. OnNext(215, 1),
  142. OnNext(225, 2),
  143. OnNext(235, 3)
  144. );
  145. var res = xs.CombineLatest(ys, (x, y) => x + y); // This creates a Producer object
  146. scheduler.ScheduleAbsolute(200, () =>
  147. {
  148. res.Subscribe(z =>
  149. {
  150. if (z == 4)
  151. {
  152. throw ex;
  153. }
  154. });
  155. });
  156. try
  157. {
  158. scheduler.Start();
  159. Assert.True(false);
  160. }
  161. catch (Exception err)
  162. {
  163. Assert.Same(ex, err);
  164. }
  165. Assert.Equal(225, scheduler.Clock);
  166. xs.Subscriptions.AssertEqual(
  167. Subscribe(200, 225)
  168. );
  169. ys.Subscriptions.AssertEqual(
  170. Subscribe(200, 225)
  171. );
  172. }
  173. [TestMethod]
  174. public void Producer_ObserverThrows_CustomObserver()
  175. {
  176. var ex = new Exception();
  177. var scheduler = new TestScheduler();
  178. var xs = scheduler.CreateHotObservable(
  179. OnNext(210, 1),
  180. OnNext(220, 2),
  181. OnNext(230, 3)
  182. );
  183. var ys = scheduler.CreateHotObservable(
  184. OnNext(215, 1),
  185. OnNext(225, 2),
  186. OnNext(235, 3)
  187. );
  188. var res = xs.CombineLatest(ys, (x, y) => x + y); // This creates a Producer object
  189. scheduler.ScheduleAbsolute(200, () =>
  190. {
  191. res.Subscribe(new MyObserver(x => x == 4, ex));
  192. });
  193. try
  194. {
  195. scheduler.Start();
  196. Assert.True(false);
  197. }
  198. catch (Exception err)
  199. {
  200. Assert.Same(ex, err);
  201. }
  202. Assert.Equal(225, scheduler.Clock);
  203. xs.Subscriptions.AssertEqual(
  204. Subscribe(200, 225)
  205. );
  206. ys.Subscriptions.AssertEqual(
  207. Subscribe(200, 225)
  208. );
  209. }
  210. private class MyObserver : ObserverBase<int>
  211. {
  212. private readonly Func<int, bool> _predicate;
  213. private readonly Exception _exception;
  214. public MyObserver(Func<int, bool> predicate, Exception exception)
  215. {
  216. _predicate = predicate;
  217. _exception = exception;
  218. }
  219. protected override void OnNextCore(int value)
  220. {
  221. if (_predicate(value))
  222. {
  223. throw _exception;
  224. }
  225. }
  226. protected override void OnErrorCore(Exception error)
  227. {
  228. }
  229. protected override void OnCompletedCore()
  230. {
  231. }
  232. }
  233. }
  234. }