ObservableAwaiterTest.cs 13 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if HAS_AWAIT
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Linq;
  8. using System.Reactive.Subjects;
  9. using System.Threading;
  10. using Microsoft.Reactive.Testing;
  11. using Xunit;
  12. using ReactiveTests.Dummies;
  13. using System.Reactive.Disposables;
  14. namespace ReactiveTests.Tests
  15. {
  16. public class ObservableAwaiterTest : ReactiveTest
  17. {
  18. [Fact]
  19. public void Await_ArgumentChecking()
  20. {
  21. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IObservable<int>)));
  22. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter<int>(default(IConnectableObservable<int>)));
  23. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.GetAwaiter(Observable.Empty<int>()).OnCompleted(null));
  24. }
  25. [Fact]
  26. public void Await()
  27. {
  28. SynchronizationContext.SetSynchronizationContext(null);
  29. var scheduler = new TestScheduler();
  30. var xs = scheduler.CreateHotObservable(
  31. OnNext(20, -1),
  32. OnNext(150, 0),
  33. OnNext(220, 1),
  34. OnNext(290, 2),
  35. OnNext(340, 3),
  36. OnCompleted<int>(410)
  37. );
  38. var awaiter = default(AsyncSubject<int>);
  39. var result = default(int);
  40. var t = long.MaxValue;
  41. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  42. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  43. scheduler.Start();
  44. Assert.Equal(410, t);
  45. Assert.Equal(3, result);
  46. xs.Subscriptions.AssertEqual(
  47. Subscribe(100)
  48. );
  49. }
  50. [Fact]
  51. public void Await_Connectable()
  52. {
  53. SynchronizationContext.SetSynchronizationContext(null);
  54. var scheduler = new TestScheduler();
  55. var s = default(long);
  56. var xs = Observable.Create<int>(observer =>
  57. {
  58. s = scheduler.Clock;
  59. return StableCompositeDisposable.Create(
  60. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  61. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  62. );
  63. });
  64. var ys = xs.Publish();
  65. var awaiter = default(AsyncSubject<int>);
  66. var result = default(int);
  67. var t = long.MaxValue;
  68. scheduler.ScheduleAbsolute(100, () => awaiter = ys.GetAwaiter());
  69. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  70. scheduler.Start();
  71. Assert.Equal(100, s);
  72. Assert.Equal(260, t);
  73. Assert.Equal(42, result);
  74. }
  75. [Fact]
  76. public void Await_Error()
  77. {
  78. SynchronizationContext.SetSynchronizationContext(null);
  79. var scheduler = new TestScheduler();
  80. var ex = new Exception();
  81. var xs = scheduler.CreateHotObservable(
  82. OnNext(20, -1),
  83. OnNext(150, 0),
  84. OnNext(220, 1),
  85. OnNext(290, 2),
  86. OnNext(340, 3),
  87. OnError<int>(410, ex)
  88. );
  89. var awaiter = default(AsyncSubject<int>);
  90. var t = long.MaxValue;
  91. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  92. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws(ex, () => awaiter.GetResult()); }));
  93. scheduler.Start();
  94. Assert.Equal(410, t);
  95. xs.Subscriptions.AssertEqual(
  96. Subscribe(100)
  97. );
  98. }
  99. [Fact]
  100. public void Await_Never()
  101. {
  102. SynchronizationContext.SetSynchronizationContext(null);
  103. var scheduler = new TestScheduler();
  104. var xs = scheduler.CreateHotObservable(
  105. OnNext(20, -1),
  106. OnNext(150, 0),
  107. OnNext(220, 1),
  108. OnNext(290, 2),
  109. OnNext(340, 3)
  110. );
  111. var awaiter = default(AsyncSubject<int>);
  112. var hasValue = default(bool);
  113. var t = long.MaxValue;
  114. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  115. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; awaiter.GetResult(); hasValue = true; }));
  116. scheduler.Start();
  117. Assert.Equal(long.MaxValue, t);
  118. Assert.False(hasValue);
  119. xs.Subscriptions.AssertEqual(
  120. Subscribe(100)
  121. );
  122. }
  123. [Fact]
  124. public void Await_Empty()
  125. {
  126. SynchronizationContext.SetSynchronizationContext(null);
  127. var scheduler = new TestScheduler();
  128. var xs = scheduler.CreateHotObservable(
  129. OnCompleted<int>(300)
  130. );
  131. var awaiter = default(AsyncSubject<int>);
  132. var t = long.MaxValue;
  133. scheduler.ScheduleAbsolute(100, () => awaiter = xs.GetAwaiter());
  134. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; ReactiveAssert.Throws<InvalidOperationException>(() => awaiter.GetResult()); }));
  135. scheduler.Start();
  136. Assert.Equal(300, t);
  137. xs.Subscriptions.AssertEqual(
  138. Subscribe(100)
  139. );
  140. }
  141. [Fact]
  142. public void RunAsync_ArgumentChecking()
  143. {
  144. var ct = CancellationToken.None;
  145. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IObservable<int>), ct));
  146. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RunAsync<int>(default(IConnectableObservable<int>), ct));
  147. }
  148. [Fact]
  149. public void RunAsync_Simple()
  150. {
  151. SynchronizationContext.SetSynchronizationContext(null);
  152. var scheduler = new TestScheduler();
  153. var xs = scheduler.CreateHotObservable(
  154. OnNext(220, 42),
  155. OnCompleted<int>(250)
  156. );
  157. var awaiter = default(AsyncSubject<int>);
  158. var result = default(int);
  159. var t = long.MaxValue;
  160. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(CancellationToken.None));
  161. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  162. scheduler.Start();
  163. Assert.Equal(250, t);
  164. Assert.Equal(42, result);
  165. xs.Subscriptions.AssertEqual(
  166. Subscribe(100)
  167. );
  168. }
  169. [Fact]
  170. public void RunAsync_Cancelled()
  171. {
  172. SynchronizationContext.SetSynchronizationContext(null);
  173. var cts = new CancellationTokenSource();
  174. cts.Cancel();
  175. var scheduler = new TestScheduler();
  176. var xs = scheduler.CreateHotObservable(
  177. OnNext(220, 42),
  178. OnCompleted<int>(250)
  179. );
  180. var awaiter = default(AsyncSubject<int>);
  181. var result = default(int);
  182. var t = long.MaxValue;
  183. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
  184. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  185. {
  186. t = scheduler.Clock;
  187. ReactiveAssert.Throws<OperationCanceledException>(() =>
  188. {
  189. result = awaiter.GetResult();
  190. });
  191. }));
  192. scheduler.Start();
  193. Assert.Equal(200, t);
  194. xs.Subscriptions.AssertEqual(
  195. );
  196. }
  197. [Fact]
  198. public void RunAsync_Cancel()
  199. {
  200. SynchronizationContext.SetSynchronizationContext(null);
  201. var cts = new CancellationTokenSource();
  202. var scheduler = new TestScheduler();
  203. var xs = scheduler.CreateHotObservable(
  204. OnNext(220, 42),
  205. OnCompleted<int>(250)
  206. );
  207. var awaiter = default(AsyncSubject<int>);
  208. var result = default(int);
  209. var t = long.MaxValue;
  210. scheduler.ScheduleAbsolute(100, () => awaiter = xs.RunAsync(cts.Token));
  211. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  212. {
  213. t = scheduler.Clock;
  214. ReactiveAssert.Throws<OperationCanceledException>(() =>
  215. {
  216. result = awaiter.GetResult();
  217. });
  218. }));
  219. scheduler.ScheduleAbsolute(210, () => cts.Cancel());
  220. scheduler.Start();
  221. Assert.Equal(210, t);
  222. xs.Subscriptions.AssertEqual(
  223. Subscribe(100, 210)
  224. );
  225. }
  226. [Fact]
  227. public void RunAsync_Connectable()
  228. {
  229. SynchronizationContext.SetSynchronizationContext(null);
  230. var scheduler = new TestScheduler();
  231. var s = default(long);
  232. var xs = Observable.Create<int>(observer =>
  233. {
  234. s = scheduler.Clock;
  235. return StableCompositeDisposable.Create(
  236. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  237. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  238. );
  239. });
  240. var ys = xs.Publish();
  241. var awaiter = default(AsyncSubject<int>);
  242. var result = default(int);
  243. var t = long.MaxValue;
  244. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(CancellationToken.None));
  245. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() => { t = scheduler.Clock; result = awaiter.GetResult(); }));
  246. scheduler.Start();
  247. Assert.Equal(100, s);
  248. Assert.Equal(260, t);
  249. Assert.Equal(42, result);
  250. }
  251. [Fact]
  252. public void RunAsync_Connectable_Cancelled()
  253. {
  254. SynchronizationContext.SetSynchronizationContext(null);
  255. var cts = new CancellationTokenSource();
  256. cts.Cancel();
  257. var scheduler = new TestScheduler();
  258. var s = default(long?);
  259. var xs = Observable.Create<int>(observer =>
  260. {
  261. s = scheduler.Clock;
  262. return StableCompositeDisposable.Create(
  263. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  264. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); })
  265. );
  266. });
  267. var ys = xs.Publish();
  268. var awaiter = default(AsyncSubject<int>);
  269. var result = default(int);
  270. var t = long.MaxValue;
  271. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
  272. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  273. {
  274. t = scheduler.Clock;
  275. ReactiveAssert.Throws<OperationCanceledException>(() =>
  276. {
  277. result = awaiter.GetResult();
  278. });
  279. }));
  280. scheduler.Start();
  281. Assert.False(s.HasValue);
  282. Assert.Equal(200, t);
  283. }
  284. [Fact]
  285. public void RunAsync_Connectable_Cancel()
  286. {
  287. SynchronizationContext.SetSynchronizationContext(null);
  288. var cts = new CancellationTokenSource();
  289. var scheduler = new TestScheduler();
  290. var s = default(long);
  291. var d = default(long);
  292. var xs = Observable.Create<int>(observer =>
  293. {
  294. s = scheduler.Clock;
  295. return StableCompositeDisposable.Create(
  296. scheduler.ScheduleAbsolute(250, () => { observer.OnNext(42); }),
  297. scheduler.ScheduleAbsolute(260, () => { observer.OnCompleted(); }),
  298. Disposable.Create(() => { d = scheduler.Clock; })
  299. );
  300. });
  301. var ys = xs.Publish();
  302. var awaiter = default(AsyncSubject<int>);
  303. var result = default(int);
  304. var t = long.MaxValue;
  305. scheduler.ScheduleAbsolute(100, () => awaiter = ys.RunAsync(cts.Token));
  306. scheduler.ScheduleAbsolute(200, () => awaiter.OnCompleted(() =>
  307. {
  308. t = scheduler.Clock;
  309. ReactiveAssert.Throws<OperationCanceledException>(() =>
  310. {
  311. result = awaiter.GetResult();
  312. });
  313. }));
  314. scheduler.ScheduleAbsolute(210, () => cts.Cancel());
  315. scheduler.Start();
  316. Assert.Equal(100, s);
  317. Assert.Equal(210, d);
  318. Assert.Equal(210, t);
  319. }
  320. }
  321. }
  322. #endif