ObservableAwaiterTest.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if HAS_AWAIT
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. using ReactiveTests.Dummies;
  13. using System.Reactive.Disposables;
  14. namespace ReactiveTests.Tests
  15. {
  16. public class ObservableAwaiterTest : ReactiveTest
  17. {
  18. [Fact]
  19. public void Await_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IObservable<int>)));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IConnectableObservable<int>)));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter(Observable.Empty<int>()).OnCompleted(null));
  24. }
  25. [Fact]
  26. public void Await()
  27. {
  28. var scheduler = new TestScheduler();
  29. var xs = scheduler.CreateHotObservable(
  30. OnNext(20, -1),
  31. OnNext(150, 0),
  32. OnNext(220, 1),
  33. OnNext(290, 2),
  34. OnNext(340, 3),
  35. OnCompleted<int>(410)
  36. );
  37. var awaiter = default(AsyncSubject<int>);
  38. var result = default(int);
  39. var t = long.MaxValue;
  40. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  41. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  42. scheduler.Start();
  43. Assert.Equal(410, t);
  44. Assert.Equal(3, result);
  45. xs.Subscriptions.AssertEqual(
  46. Subscribe(100)
  47. );
  48. }
  49. [Fact]
  50. public void Await_Connectable()
  51. {
  52. var scheduler = new TestScheduler();
  53. var s = default(long);
  54. var xs = Observable.Create<int>(observer =>
  55. {
  56. s = scheduler.Clock;
  57. return StableCompositeDisposable.Create(
  58. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  59. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  60. );
  61. });
  62. var ys = xs.Publish();
  63. var awaiter = default(AsyncSubject<int>);
  64. var result = default(int);
  65. var t = long.MaxValue;
  66. scheduler.ScheduleAbsolute(100, () => awaiter = ys.GetAwaiter());
  67. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  68. scheduler.Start();
  69. Assert.Equal(100, s);
  70. Assert.Equal(260, t);
  71. Assert.Equal(42, result);
  72. }
  73. [Fact]
  74. public void Await_Error()
  75. {
  76. var scheduler = new TestScheduler();
  77. var ex = new Exception();
  78. var xs = scheduler.CreateHotObservable(
  79. OnNext(20, -1),
  80. OnNext(150, 0),
  81. OnNext(220, 1),
  82. OnNext(290, 2),
  83. OnNext(340, 3),
  84. OnError<int>(410, ex)
  85. );
  86. var awaiter = default(AsyncSubject<int>);
  87. var t = long.MaxValue;
  88. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  89. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws(ex, () => awaiter.GetResult()); }));
  90. scheduler.Start();
  91. Assert.Equal(410, t);
  92. xs.Subscriptions.AssertEqual(
  93. Subscribe(100)
  94. );
  95. }
  96. [Fact]
  97. public void Await_Never()
  98. {
  99. var scheduler = new TestScheduler();
  100. var xs = scheduler.CreateHotObservable(
  101. OnNext(20, -1),
  102. OnNext(150, 0),
  103. OnNext(220, 1),
  104. OnNext(290, 2),
  105. OnNext(340, 3)
  106. );
  107. var awaiter = default(AsyncSubject<int>);
  108. var hasValue = default(bool);
  109. var t = long.MaxValue;
  110. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  111. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; awaiter.GetResult(); hasValue = true; }));
  112. scheduler.Start();
  113. Assert.Equal(long.MaxValue, t);
  114. Assert.False(hasValue);
  115. xs.Subscriptions.AssertEqual(
  116. Subscribe(100)
  117. );
  118. }
  119. [Fact]
  120. public void Await_Empty()
  121. {
  122. var scheduler = new TestScheduler();
  123. var xs = scheduler.CreateHotObservable(
  124. OnCompleted<int>(300)
  125. );
  126. var awaiter = default(AsyncSubject<int>);
  127. var t = long.MaxValue;
  128. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  129. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws<InvalidOperationException>(() => awaiter.GetResult()); }));
  130. scheduler.Start();
  131. Assert.Equal(300, t);
  132. xs.Subscriptions.AssertEqual(
  133. Subscribe(100)
  134. );
  135. }
  136. [Fact]
  137. public void RunAsync_ArgumentChecking()
  138. {
  139. var ct = CancellationToken.None;
  140. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IObservable<int>), ct));
  141. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IConnectableObservable<int>), ct));
  142. }
  143. [Fact]
  144. public void RunAsync_Simple()
  145. {
  146. var scheduler = new TestScheduler();
  147. var xs = scheduler.CreateHotObservable(
  148. OnNext(220, 42),
  149. OnCompleted<int>(250)
  150. );
  151. var awaiter = default(AsyncSubject<int>);
  152. var result = default(int);
  153. var t = long.MaxValue;
  154. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(CancellationToken.None));
  155. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  156. scheduler.Start();
  157. Assert.Equal(250, t);
  158. Assert.Equal(42, result);
  159. xs.Subscriptions.AssertEqual(
  160. Subscribe(100)
  161. );
  162. }
  163. [Fact]
  164. public void RunAsync_Cancelled()
  165. {
  166. var cts = new CancellationTokenSource();
  167. cts.Cancel();
  168. var scheduler = new TestScheduler();
  169. var xs = scheduler.CreateHotObservable(
  170. OnNext(220, 42),
  171. OnCompleted<int>(250)
  172. );
  173. var awaiter = default(AsyncSubject<int>);
  174. var result = default(int);
  175. var t = long.MaxValue;
  176. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
  177. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  178. {
  179. t = scheduler.Clock;
  180. ReactiveAssert.Throws<OperationCanceledException>(() =>
  181. {
  182. result = awaiter.GetResult();
  183. });
  184. }));
  185. scheduler.Start();
  186. Assert.Equal(200, t);
  187. xs.Subscriptions.AssertEqual(
  188. );
  189. }
  190. [Fact]
  191. public void RunAsync_Cancel()
  192. {
  193. var cts = new CancellationTokenSource();
  194. var scheduler = new TestScheduler();
  195. var xs = scheduler.CreateHotObservable(
  196. OnNext(220, 42),
  197. OnCompleted<int>(250)
  198. );
  199. var awaiter = default(AsyncSubject<int>);
  200. var result = default(int);
  201. var t = long.MaxValue;
  202. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
  203. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  204. {
  205. t = scheduler.Clock;
  206. ReactiveAssert.Throws<OperationCanceledException>(() =>
  207. {
  208. result = awaiter.GetResult();
  209. });
  210. }));
  211. scheduler.ScheduleAbsolute(210, () => cts.Cancel());
  212. scheduler.Start();
  213. Assert.Equal(210, t);
  214. xs.Subscriptions.AssertEqual(
  215. Subscribe(100, 210)
  216. );
  217. }
  218. [Fact]
  219. public void RunAsync_Connectable()
  220. {
  221. var scheduler = new TestScheduler();
  222. var s = default(long);
  223. var xs = Observable.Create<int>(observer =>
  224. {
  225. s = scheduler.Clock;
  226. return StableCompositeDisposable.Create(
  227. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  228. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  229. );
  230. });
  231. var ys = xs.Publish();
  232. var awaiter = default(AsyncSubject<int>);
  233. var result = default(int);
  234. var t = long.MaxValue;
  235. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(CancellationToken.None));
  236. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  237. scheduler.Start();
  238. Assert.Equal(100, s);
  239. Assert.Equal(260, t);
  240. Assert.Equal(42, result);
  241. }
  242. [Fact]
  243. public void RunAsync_Connectable_Cancelled()
  244. {
  245. var cts = new CancellationTokenSource();
  246. cts.Cancel();
  247. var scheduler = new TestScheduler();
  248. var s = default(long?);
  249. var xs = Observable.Create<int>(observer =>
  250. {
  251. s = scheduler.Clock;
  252. return StableCompositeDisposable.Create(
  253. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  254. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  255. );
  256. });
  257. var ys = xs.Publish();
  258. var awaiter = default(AsyncSubject<int>);
  259. var result = default(int);
  260. var t = long.MaxValue;
  261. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
  262. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  263. {
  264. t = scheduler.Clock;
  265. ReactiveAssert.Throws<OperationCanceledException>(() =>
  266. {
  267. result = awaiter.GetResult();
  268. });
  269. }));
  270. scheduler.Start();
  271. Assert.False(s.HasValue);
  272. Assert.Equal(200, t);
  273. }
  274. [Fact]
  275. public void RunAsync_Connectable_Cancel()
  276. {
  277. var cts = new CancellationTokenSource();
  278. var scheduler = new TestScheduler();
  279. var s = default(long);
  280. var d = default(long);
  281. var xs = Observable.Create<int>(observer =>
  282. {
  283. s = scheduler.Clock;
  284. return StableCompositeDisposable.Create(
  285. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  286. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); }),
  287. Disposable.Create(() => { d = scheduler.Clock; })
  288. );
  289. });
  290. var ys = xs.Publish();
  291. var awaiter = default(AsyncSubject<int>);
  292. var result = default(int);
  293. var t = long.MaxValue;
  294. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
  295. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  296. {
  297. t = scheduler.Clock;
  298. ReactiveAssert.Throws<OperationCanceledException>(() =>
  299. {
  300. result = awaiter.GetResult();
  301. });
  302. }));
  303. scheduler.ScheduleAbsolute(210, () => cts.Cancel());
  304. scheduler.Start();
  305. Assert.Equal(100, s);
  306. Assert.Equal(210, d);
  307. Assert.Equal(210, t);
  308. }
  309. }
  310. }
  311. #endif