ObservableAwaiterTest.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if HAS_AWAIT
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. using ReactiveTests.Dummies;
  13. using System.Reactive.Disposables;
  14. namespace ReactiveTests.Tests
  15. {
  16. public class ObservableAwaiterTest : ReactiveTest
  17. {
  18. [Fact]
  19. public void Await_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IObservable<int>)));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IConnectableObservable<int>)));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter(Observable.Empty<int>()).OnCompleted(null));
  24. }
  25. [Fact]
  26. public void Await()
  27. {
  28. SynchronizationContext.SetSynchronizationContext(null);
  29. var scheduler = new TestScheduler();
  30. var xs = scheduler.CreateHotObservable(
  31. OnNext(20, -1),
  32. OnNext(150, 0),
  33. OnNext(220, 1),
  34. OnNext(290, 2),
  35. OnNext(340, 3),
  36. OnCompleted<int>(410)
  37. );
  38. var awaiter = default(AsyncSubject<int>);
  39. var result = default(int);
  40. var t = long.MaxValue;
  41. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  42. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  43. scheduler.Start();
  44. Assert.Equal(410, t);
  45. Assert.Equal(3, result);
  46. xs.Subscriptions.AssertEqual(
  47. Subscribe(100)
  48. );
  49. }
  50. [Fact]
  51. public void Await_Connectable()
  52. {
  53. SynchronizationContext.SetSynchronizationContext(null);
  54. var scheduler = new TestScheduler();
  55. var s = default(long);
  56. var xs = Observable.Create<int>(observer =>
  57. {
  58. s = scheduler.Clock;
  59. return StableCompositeDisposable.Create(
  60. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  61. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  62. );
  63. });
  64. var ys = xs.Publish();
  65. var awaiter = default(AsyncSubject<int>);
  66. var result = default(int);
  67. var t = long.MaxValue;
  68. scheduler.ScheduleAbsolute(100, () => awaiter = ys.GetAwaiter());
  69. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  70. scheduler.Start();
  71. Assert.Equal(100, s);
  72. Assert.Equal(260, t);
  73. Assert.Equal(42, result);
  74. }
  75. [Fact]
  76. public void Await_Error()
  77. {
  78. SynchronizationContext.SetSynchronizationContext(null);
  79. var scheduler = new TestScheduler();
  80. var ex = new Exception();
  81. var xs = scheduler.CreateHotObservable(
  82. OnNext(20, -1),
  83. OnNext(150, 0),
  84. OnNext(220, 1),
  85. OnNext(290, 2),
  86. OnNext(340, 3),
  87. OnError<int>(410, ex)
  88. );
  89. var awaiter = default(AsyncSubject<int>);
  90. var t = long.MaxValue;
  91. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  92. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws(ex, () => awaiter.GetResult()); }));
  93. scheduler.Start();
  94. Assert.Equal(410, t);
  95. xs.Subscriptions.AssertEqual(
  96. Subscribe(100)
  97. );
  98. }
  99. [Fact]
  100. public void Await_Never()
  101. {
  102. SynchronizationContext.SetSynchronizationContext(null);
  103. var scheduler = new TestScheduler();
  104. var xs = scheduler.CreateHotObservable(
  105. OnNext(20, -1),
  106. OnNext(150, 0),
  107. OnNext(220, 1),
  108. OnNext(290, 2),
  109. OnNext(340, 3)
  110. );
  111. var awaiter = default(AsyncSubject<int>);
  112. var hasValue = default(bool);
  113. var t = long.MaxValue;
  114. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  115. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; awaiter.GetResult(); hasValue = true; }));
  116. scheduler.Start();
  117. Assert.Equal(long.MaxValue, t);
  118. Assert.False(hasValue);
  119. xs.Subscriptions.AssertEqual(
  120. Subscribe(100)
  121. );
  122. }
  123. [Fact]
  124. public void Await_Empty()
  125. {
  126. SynchronizationContext.SetSynchronizationContext(null);
  127. var scheduler = new TestScheduler();
  128. var xs = scheduler.CreateHotObservable(
  129. OnCompleted<int>(300)
  130. );
  131. var awaiter = default(AsyncSubject<int>);
  132. var t = long.MaxValue;
  133. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  134. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws<InvalidOperationException>(() => awaiter.GetResult()); }));
  135. scheduler.Start();
  136. Assert.Equal(300, t);
  137. xs.Subscriptions.AssertEqual(
  138. Subscribe(100)
  139. );
  140. }
  141. [Fact]
  142. public void RunAsync_ArgumentChecking()
  143. {
  144. var ct = CancellationToken.None;
  145. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IObservable<int>), ct));
  146. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IConnectableObservable<int>), ct));
  147. }
  148. [Fact]
  149. public void RunAsync_Simple()
  150. {
  151. SynchronizationContext.SetSynchronizationContext(null);
  152. var scheduler = new TestScheduler();
  153. var xs = scheduler.CreateHotObservable(
  154. OnNext(220, 42),
  155. OnCompleted<int>(250)
  156. );
  157. var awaiter = default(AsyncSubject<int>);
  158. var result = default(int);
  159. var t = long.MaxValue;
  160. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(CancellationToken.None));
  161. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  162. scheduler.Start();
  163. Assert.Equal(250, t);
  164. Assert.Equal(42, result);
  165. xs.Subscriptions.AssertEqual(
  166. Subscribe(100)
  167. );
  168. }
  169. [Fact]
  170. public void RunAsync_Cancelled()
  171. {
  172. SynchronizationContext.SetSynchronizationContext(null);
  173. var cts = new CancellationTokenSource();
  174. cts.Cancel();
  175. var scheduler = new TestScheduler();
  176. var xs = scheduler.CreateHotObservable(
  177. OnNext(220, 42),
  178. OnCompleted<int>(250)
  179. );
  180. var awaiter = default(AsyncSubject<int>);
  181. var result = default(int);
  182. var t = long.MaxValue;
  183. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
  184. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  185. {
  186. t = scheduler.Clock;
  187. ReactiveAssert.Throws<OperationCanceledException>(() =>
  188. {
  189. result = awaiter.GetResult();
  190. });
  191. }));
  192. scheduler.Start();
  193. Assert.Equal(200, t);
  194. xs.Subscriptions.AssertEqual(
  195. );
  196. }
  197. [Fact]
  198. public void RunAsync_Cancel()
  199. {
  200. SynchronizationContext.SetSynchronizationContext(null);
  201. var cts = new CancellationTokenSource();
  202. var scheduler = new TestScheduler();
  203. var xs = scheduler.CreateHotObservable(
  204. OnNext(220, 42),
  205. OnCompleted<int>(250)
  206. );
  207. var awaiter = default(AsyncSubject<int>);
  208. var result = default(int);
  209. var t = long.MaxValue;
  210. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
  211. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  212. {
  213. t = scheduler.Clock;
  214. ReactiveAssert.Throws<OperationCanceledException>(() =>
  215. {
  216. result = awaiter.GetResult();
  217. });
  218. }));
  219. scheduler.ScheduleAbsolute(210, () => cts.Cancel());
  220. scheduler.Start();
  221. Assert.Equal(210, t);
  222. xs.Subscriptions.AssertEqual(
  223. Subscribe(100, 210)
  224. );
  225. }
  226. [Fact]
  227. public void RunAsync_Connectable()
  228. {
  229. SynchronizationContext.SetSynchronizationContext(null);
  230. var scheduler = new TestScheduler();
  231. var s = default(long);
  232. var xs = Observable.Create<int>(observer =>
  233. {
  234. s = scheduler.Clock;
  235. return StableCompositeDisposable.Create(
  236. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  237. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  238. );
  239. });
  240. var ys = xs.Publish();
  241. var awaiter = default(AsyncSubject<int>);
  242. var result = default(int);
  243. var t = long.MaxValue;
  244. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(CancellationToken.None));
  245. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  246. scheduler.Start();
  247. Assert.Equal(100, s);
  248. Assert.Equal(260, t);
  249. Assert.Equal(42, result);
  250. }
  251. [Fact]
  252. public void RunAsync_Connectable_Cancelled()
  253. {
  254. SynchronizationContext.SetSynchronizationContext(null);
  255. var cts = new CancellationTokenSource();
  256. cts.Cancel();
  257. var scheduler = new TestScheduler();
  258. var s = default(long?);
  259. var xs = Observable.Create<int>(observer =>
  260. {
  261. s = scheduler.Clock;
  262. return StableCompositeDisposable.Create(
  263. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  264. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  265. );
  266. });
  267. var ys = xs.Publish();
  268. var awaiter = default(AsyncSubject<int>);
  269. var result = default(int);
  270. var t = long.MaxValue;
  271. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
  272. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  273. {
  274. t = scheduler.Clock;
  275. ReactiveAssert.Throws<OperationCanceledException>(() =>
  276. {
  277. result = awaiter.GetResult();
  278. });
  279. }));
  280. scheduler.Start();
  281. Assert.False(s.HasValue);
  282. Assert.Equal(200, t);
  283. }
  284. [Fact]
  285. public void RunAsync_Connectable_Cancel()
  286. {
  287. SynchronizationContext.SetSynchronizationContext(null);
  288. var cts = new CancellationTokenSource();
  289. var scheduler = new TestScheduler();
  290. var s = default(long);
  291. var d = default(long);
  292. var xs = Observable.Create<int>(observer =>
  293. {
  294. s = scheduler.Clock;
  295. return StableCompositeDisposable.Create(
  296. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  297. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); }),
  298. Disposable.Create(() => { d = scheduler.Clock; })
  299. );
  300. });
  301. var ys = xs.Publish();
  302. var awaiter = default(AsyncSubject<int>);
  303. var result = default(int);
  304. var t = long.MaxValue;
  305. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
  306. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  307. {
  308. t = scheduler.Clock;
  309. ReactiveAssert.Throws<OperationCanceledException>(() =>
  310. {
  311. result = awaiter.GetResult();
  312. });
  313. }));
  314. scheduler.ScheduleAbsolute(210, () => cts.Cancel());
  315. scheduler.Start();
  316. Assert.Equal(100, s);
  317. Assert.Equal(210, d);
  318. Assert.Equal(210, t);
  319. }
  320. }
  321. }
  322. #endif