ObservableExtensionsTest.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Threading;
  9. using Microsoft.Reactive.Testing;
  10. using Xunit;
  11. using System.Collections.Generic;
  12. namespace ReactiveTests.Tests
  13. {
  14. public partial class ObservableExtensionsTest : ReactiveTest
  15. {
  16. #region Subscribe
  17. [Fact]
  18. public void Subscribe_ArgumentChecking()
  19. {
  20. var someObservable = Observable.Empty<int>();
  21. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>)));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>)));
  24. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }, () => { }));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>), () => { }));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, default(Action)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }, (Exception _) => { }));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>), (Exception _) => { }));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, default(Action<Exception>)));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }, (Exception _) => { }, () => { }));
  31. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>), (Exception _) => { }, () => { }));
  32. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, default(Action<Exception>), () => { }));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, (Exception _) => { }, default(Action)));
  34. }
  35. [Fact]
  36. public void Subscribe_None_Return()
  37. {
  38. Observable.Return(1, Scheduler.Immediate).Subscribe();
  39. }
  40. [Fact]
  41. public void Subscribe_None_Throw()
  42. {
  43. var ex = new Exception();
  44. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  45. ReactiveAssert.Throws(ex, () => xs.Subscribe());
  46. }
  47. [Fact]
  48. public void Subscribe_None_Empty()
  49. {
  50. Observable.Empty<int>(Scheduler.Immediate).Subscribe((int _) => { Assert.True(false); });
  51. }
  52. [Fact]
  53. public void Subscribe_OnNext_Return()
  54. {
  55. int _x = -1;
  56. Observable.Return(42, Scheduler.Immediate).Subscribe((int x) => { _x = x; });
  57. Assert.Equal(42, _x);
  58. }
  59. [Fact]
  60. public void Subscribe_OnNext_Throw()
  61. {
  62. var ex = new Exception();
  63. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  64. ReactiveAssert.Throws(ex, () => xs.Subscribe(_ => { Assert.True(false); }));
  65. }
  66. [Fact]
  67. public void Subscribe_OnNext_Empty()
  68. {
  69. Observable.Empty<int>(Scheduler.Immediate).Subscribe((int _) => { Assert.True(false); });
  70. }
  71. [Fact]
  72. public void Subscribe_OnNextOnCompleted_Return()
  73. {
  74. bool finished = false;
  75. int _x = -1;
  76. Observable.Return(42, Scheduler.Immediate).Subscribe((int x) => { _x = x; }, () => { finished = true; });
  77. Assert.Equal(42, _x);
  78. Assert.True(finished);
  79. }
  80. [Fact]
  81. public void Subscribe_OnNextOnCompleted_Throw()
  82. {
  83. var ex = new Exception();
  84. var xs = Observable.Throw<int>(ex, Scheduler.Immediate);
  85. ReactiveAssert.Throws(ex, () => xs.Subscribe(_ => { Assert.True(false); }, () => { Assert.True(false); }));
  86. }
  87. [Fact]
  88. public void Subscribe_OnNextOnCompleted_Empty()
  89. {
  90. bool finished = false;
  91. Observable.Empty<int>(Scheduler.Immediate).Subscribe((int _) => { Assert.True(false); }, () => { finished = true; });
  92. Assert.True(finished);
  93. }
  94. #endregion
  95. #region Subscribe with CancellationToken
  96. #if !NO_TPL
  97. [Fact]
  98. public void Subscribe_CT_ArgumentChecking()
  99. {
  100. var someObservable = Observable.Empty<int>();
  101. var someObserver = Observer.Create<int>(_ => { });
  102. var ct = CancellationToken.None;
  103. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), someObserver, ct));
  104. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(IObserver<int>), ct));
  105. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), ct));
  106. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }, ct));
  107. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>), ct));
  108. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }, () => { }, ct));
  109. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>), () => { }, ct));
  110. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, default(Action), ct));
  111. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }, (Exception _) => { }, ct));
  112. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>), (Exception _) => { }, ct));
  113. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, default(Action<Exception>), ct));
  114. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(default(IObservable<int>), _ => { }, (Exception _) => { }, () => { }, ct));
  115. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, default(Action<int>), (Exception _) => { }, () => { }, ct));
  116. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, default(Action<Exception>), () => { }, ct));
  117. ReactiveAssert.Throws<ArgumentNullException>(() => ObservableExtensions.Subscribe<int>(someObservable, _ => { }, (Exception _) => { }, default(Action), ct));
  118. }
  119. [Fact]
  120. public void Subscribe_CT_None()
  121. {
  122. var scheduler = new TestScheduler();
  123. var xs = scheduler.CreateHotObservable(
  124. OnNext(210, 1),
  125. OnNext(220, 2),
  126. OnNext(230, 3),
  127. OnCompleted<int>(240)
  128. );
  129. var obs = scheduler.CreateObserver<int>();
  130. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, CancellationToken.None));
  131. scheduler.Start();
  132. obs.Messages.AssertEqual(
  133. OnNext(210, 1),
  134. OnNext(220, 2),
  135. OnNext(230, 3),
  136. OnCompleted<int>(240)
  137. );
  138. xs.Subscriptions.AssertEqual(
  139. Subscribe(200, Subscription.Infinite /* no auto-dispose when using CreateHotObservable */)
  140. );
  141. }
  142. [Fact]
  143. public void Subscribe_CT_CancelBeforeBegin()
  144. {
  145. var scheduler = new TestScheduler();
  146. var xs = scheduler.CreateHotObservable(
  147. OnNext(210, 1),
  148. OnNext(220, 2),
  149. OnNext(230, 3),
  150. OnCompleted<int>(240)
  151. );
  152. var cts = new CancellationTokenSource();
  153. var obs = scheduler.CreateObserver<int>();
  154. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token));
  155. scheduler.ScheduleAbsolute(150, cts.Cancel);
  156. scheduler.Start();
  157. obs.Messages.AssertEqual(
  158. );
  159. xs.Subscriptions.AssertEqual(
  160. );
  161. }
  162. [Fact]
  163. public void Subscribe_CT_CancelMiddle()
  164. {
  165. var scheduler = new TestScheduler();
  166. var xs = scheduler.CreateHotObservable(
  167. OnNext(210, 1),
  168. OnNext(220, 2),
  169. OnNext(230, 3),
  170. OnCompleted<int>(240)
  171. );
  172. var cts = new CancellationTokenSource();
  173. var obs = scheduler.CreateObserver<int>();
  174. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token));
  175. scheduler.ScheduleAbsolute(225, cts.Cancel);
  176. scheduler.Start();
  177. obs.Messages.AssertEqual(
  178. OnNext(210, 1),
  179. OnNext(220, 2)
  180. );
  181. xs.Subscriptions.AssertEqual(
  182. Subscribe(200, 225)
  183. );
  184. }
  185. [Fact]
  186. public void Subscribe_CT_CancelAfterEnd()
  187. {
  188. var scheduler = new TestScheduler();
  189. var ex = new Exception();
  190. var xs = scheduler.CreateHotObservable(
  191. OnNext(210, 1),
  192. OnNext(220, 2),
  193. OnNext(230, 3),
  194. OnError<int>(240, ex)
  195. );
  196. var cts = new CancellationTokenSource();
  197. var obs = scheduler.CreateObserver<int>();
  198. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token));
  199. scheduler.ScheduleAbsolute(250, cts.Cancel);
  200. scheduler.Start();
  201. obs.Messages.AssertEqual(
  202. OnNext(210, 1),
  203. OnNext(220, 2),
  204. OnNext(230, 3),
  205. OnError<int>(240, ex)
  206. );
  207. xs.Subscriptions.AssertEqual(
  208. Subscribe(200, Subscription.Infinite /* no auto-dispose when using CreateHotObservable */)
  209. );
  210. }
  211. [Fact]
  212. public void Subscribe_CT_NeverCancel()
  213. {
  214. var scheduler = new TestScheduler();
  215. var xs = scheduler.CreateHotObservable(
  216. OnNext(210, 1),
  217. OnNext(220, 2),
  218. OnNext(230, 3),
  219. OnCompleted<int>(240)
  220. );
  221. var cts = new CancellationTokenSource();
  222. var obs = scheduler.CreateObserver<int>();
  223. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(obs, cts.Token));
  224. scheduler.Start();
  225. obs.Messages.AssertEqual(
  226. OnNext(210, 1),
  227. OnNext(220, 2),
  228. OnNext(230, 3),
  229. OnCompleted<int>(240)
  230. );
  231. xs.Subscriptions.AssertEqual(
  232. Subscribe(200, Subscription.Infinite /* no auto-dispose when using CreateHotObservable */)
  233. );
  234. }
  235. [Fact]
  236. public void Subscribe_CT_Overloads_AlreadyCancelled()
  237. {
  238. var xs = Observable.Defer<int>(() =>
  239. {
  240. Assert.True(false);
  241. return Observable.Return(42, Scheduler.Immediate);
  242. });
  243. var cts = new CancellationTokenSource();
  244. cts.Cancel();
  245. xs.Subscribe(cts.Token);
  246. xs.Subscribe(_ => { }, cts.Token);
  247. xs.Subscribe(_ => { }, ex => { }, cts.Token);
  248. xs.Subscribe(_ => { }, () => { }, cts.Token);
  249. xs.Subscribe(_ => { }, ex => { }, () => { }, cts.Token);
  250. xs.Subscribe(Observer.Create<int>(_ => { }, ex => { }, () => { }), cts.Token);
  251. }
  252. [Fact]
  253. public void Subscribe_CT_Overloads_None()
  254. {
  255. var i = 0;
  256. var n = 0;
  257. var e = 0;
  258. var c = 0;
  259. var xs = Observable.Defer<int>(() =>
  260. {
  261. i++;
  262. return Observable.Return(42, Scheduler.Immediate);
  263. });
  264. xs.Subscribe(CancellationToken.None);
  265. xs.Subscribe(_ => { n++; }, CancellationToken.None);
  266. xs.Subscribe(_ => { n++; }, ex => { e++; }, CancellationToken.None);
  267. xs.Subscribe(_ => { n++; }, () => { c++; }, CancellationToken.None);
  268. xs.Subscribe(_ => { n++; }, ex => { e++; }, () => { c++; }, CancellationToken.None);
  269. xs.Subscribe(Observer.Create<int>(_ => { n++; }, ex => { e++; }, () => { c++; }), CancellationToken.None);
  270. Assert.Equal(6, i);
  271. Assert.Equal(5, n);
  272. Assert.Equal(0, e);
  273. Assert.Equal(3, c);
  274. }
  275. [Fact]
  276. public void Subscribe_CT_CancelDuringCallback()
  277. {
  278. var scheduler = new TestScheduler();
  279. var xs = scheduler.CreateHotObservable(
  280. OnNext(210, 1),
  281. OnNext(220, 2),
  282. OnNext(230, 3),
  283. OnCompleted<int>(240)
  284. );
  285. var cts = new CancellationTokenSource();
  286. var n = 0;
  287. scheduler.ScheduleAbsolute(200, () => xs.Subscribe(x =>
  288. {
  289. n++;
  290. if (x == 2)
  291. cts.Cancel();
  292. }, cts.Token));
  293. scheduler.Start();
  294. Assert.Equal(2, n);
  295. xs.Subscriptions.AssertEqual(
  296. Subscribe(200, 220)
  297. );
  298. }
  299. #endif
  300. #endregion
  301. }
  302. }