CreateAsyncTest.cs 20 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Reflection;
  16. using System.Threading;
  17. using System.Reactive.Disposables;
  18. namespace ReactiveTests.Tests
  19. {
  20. public class CreateAsyncTest : ReactiveTest
  21. {
  22. [Fact]
  23. public void CreateAsync_ArgumentChecking()
  24. {
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Task>)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, CancellationToken, Task>)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Task<IDisposable>>)));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, CancellationToken, Task<IDisposable>>)));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, Task<Action>>)));
  30. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create<int>(default(Func<IObserver<int>, CancellationToken, Task<Action>>)));
  31. }
  32. [Fact]
  33. public void CreateAsync_NullCoalescingAction1()
  34. {
  35. var xs = Observable.Create<int>(o =>
  36. {
  37. o.OnNext(42);
  38. return Task.Factory.StartNew(() => default(Action));
  39. });
  40. var lst = new List<int>();
  41. var d = xs.Subscribe(lst.Add);
  42. d.Dispose();
  43. Assert.True(lst.SequenceEqual(new[] { 42 }));
  44. }
  45. [Fact]
  46. public void CreateAsync_NullCoalescingAction2()
  47. {
  48. var xs = Observable.Create<int>((o, ct) =>
  49. {
  50. o.OnNext(42);
  51. return Task.Factory.StartNew(() => default(Action));
  52. });
  53. var lst = new List<int>();
  54. var d = xs.Subscribe(lst.Add);
  55. d.Dispose();
  56. Assert.True(lst.SequenceEqual(new[] { 42 }));
  57. }
  58. [Fact]
  59. public void CreateAsync_NullCoalescingDisposable1()
  60. {
  61. var xs = Observable.Create<int>(o =>
  62. {
  63. o.OnNext(42);
  64. return Task.Factory.StartNew(() => default(IDisposable));
  65. });
  66. var lst = new List<int>();
  67. var d = xs.Subscribe(lst.Add);
  68. d.Dispose();
  69. Assert.True(lst.SequenceEqual(new[] { 42 }));
  70. }
  71. [Fact]
  72. public void CreateAsync_NullCoalescingDisposable2()
  73. {
  74. var xs = Observable.Create<int>((o, ct) =>
  75. {
  76. o.OnNext(42);
  77. return Task.Factory.StartNew(() => default(IDisposable));
  78. });
  79. var lst = new List<int>();
  80. var d = xs.Subscribe(lst.Add);
  81. d.Dispose();
  82. Assert.True(lst.SequenceEqual(new[] { 42 }));
  83. }
  84. Task Producer1(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  85. {
  86. var tcs = new TaskCompletionSource<object>();
  87. var x = 0;
  88. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  89. {
  90. results.OnNext(++x);
  91. self(TimeSpan.FromTicks(100));
  92. });
  93. token.Register(d.Dispose);
  94. return tcs.Task;
  95. }
  96. [Fact]
  97. public void CreateAsync_Never()
  98. {
  99. RunSynchronously(() =>
  100. {
  101. var scheduler = new TestScheduler();
  102. var res = scheduler.Start(() =>
  103. Observable.Create<int>((observer, token) => Producer1(observer, token, scheduler))
  104. );
  105. res.Messages.AssertEqual(
  106. OnNext(300, 1),
  107. OnNext(400, 2),
  108. OnNext(500, 3),
  109. OnNext(600, 4),
  110. OnNext(700, 5),
  111. OnNext(800, 6),
  112. OnNext(900, 7)
  113. );
  114. });
  115. }
  116. Task Producer2(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  117. {
  118. var tcs = new TaskCompletionSource<object>();
  119. var x = 0;
  120. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  121. {
  122. if (x == 4)
  123. {
  124. tcs.SetResult(null);
  125. }
  126. results.OnNext(++x);
  127. self(TimeSpan.FromTicks(100));
  128. });
  129. token.Register(d.Dispose);
  130. return tcs.Task;
  131. }
  132. [Fact]
  133. public void CreateAsync_Completed1()
  134. {
  135. RunSynchronously(() =>
  136. {
  137. var scheduler = new TestScheduler();
  138. var res = scheduler.Start(() =>
  139. Observable.Create<int>((observer, token) => Producer2(observer, token, scheduler))
  140. );
  141. res.Messages.AssertEqual(
  142. OnNext(300, 1),
  143. OnNext(400, 2),
  144. OnNext(500, 3),
  145. OnNext(600, 4),
  146. OnCompleted<int>(700)
  147. );
  148. });
  149. }
  150. Task Producer3(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  151. {
  152. var tcs = new TaskCompletionSource<object>();
  153. var x = 0;
  154. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  155. {
  156. if (x == 4)
  157. {
  158. results.OnCompleted();
  159. }
  160. results.OnNext(++x);
  161. self(TimeSpan.FromTicks(100));
  162. });
  163. token.Register(d.Dispose);
  164. return tcs.Task;
  165. }
  166. [Fact]
  167. public void CreateAsync_Completed2()
  168. {
  169. RunSynchronously(() =>
  170. {
  171. var scheduler = new TestScheduler();
  172. var res = scheduler.Start(() =>
  173. Observable.Create<int>((observer, token) => Producer3(observer, token, scheduler))
  174. );
  175. res.Messages.AssertEqual(
  176. OnNext(300, 1),
  177. OnNext(400, 2),
  178. OnNext(500, 3),
  179. OnNext(600, 4),
  180. OnCompleted<int>(700)
  181. );
  182. });
  183. }
  184. Task Producer4(IObserver<int> results, CancellationToken token, IScheduler scheduler, Exception exception)
  185. {
  186. var tcs = new TaskCompletionSource<object>();
  187. var x = 0;
  188. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  189. {
  190. if (x == 4)
  191. {
  192. results.OnError(exception);
  193. }
  194. results.OnNext(++x);
  195. self(TimeSpan.FromTicks(100));
  196. });
  197. token.Register(d.Dispose);
  198. return tcs.Task;
  199. }
  200. [Fact]
  201. public void CreateAsync_Error1()
  202. {
  203. RunSynchronously(() =>
  204. {
  205. var scheduler = new TestScheduler();
  206. var exception = new Exception();
  207. var res = scheduler.Start(() =>
  208. Observable.Create<int>((observer, token) => Producer4(observer, token, scheduler, exception))
  209. );
  210. res.Messages.AssertEqual(
  211. OnNext(300, 1),
  212. OnNext(400, 2),
  213. OnNext(500, 3),
  214. OnNext(600, 4),
  215. OnError<int>(700, exception)
  216. );
  217. });
  218. }
  219. Task Producer5(IObserver<int> results, CancellationToken token, IScheduler scheduler, Exception exception)
  220. {
  221. var tcs = new TaskCompletionSource<object>();
  222. var x = 0;
  223. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  224. {
  225. if (x == 4)
  226. {
  227. tcs.SetException(exception);
  228. }
  229. results.OnNext(++x);
  230. self(TimeSpan.FromTicks(100));
  231. });
  232. token.Register(d.Dispose);
  233. return tcs.Task;
  234. }
  235. [Fact]
  236. public void CreateAsync_Error2()
  237. {
  238. RunSynchronously(() =>
  239. {
  240. var scheduler = new TestScheduler();
  241. var exception = new Exception();
  242. var res = scheduler.Start(() =>
  243. Observable.Create<int>((observer, token) => Producer5(observer, token, scheduler, exception))
  244. );
  245. res.Messages.AssertEqual(
  246. OnNext(300, 1),
  247. OnNext(400, 2),
  248. OnNext(500, 3),
  249. OnNext(600, 4),
  250. OnError<int>(700, exception)
  251. );
  252. });
  253. }
  254. Task Producer6(IObserver<int> results, CancellationToken token, Exception exception)
  255. {
  256. throw exception;
  257. }
  258. [Fact]
  259. public void CreateAsync_Error3()
  260. {
  261. RunSynchronously(() =>
  262. {
  263. var scheduler = new TestScheduler();
  264. var exception = new InvalidOperationException();
  265. var res = scheduler.Start(() =>
  266. Observable.Create<int>((observer, token) => Producer6(observer, token, exception))
  267. );
  268. res.Messages.AssertEqual(
  269. OnError<int>(200, exception)
  270. );
  271. });
  272. }
  273. Task Producer7(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  274. {
  275. var tcs = new TaskCompletionSource<object>();
  276. var x = 0;
  277. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  278. {
  279. if (x == 4)
  280. {
  281. tcs.SetResult(null);
  282. }
  283. results.OnNext(++x);
  284. self(TimeSpan.FromTicks(100));
  285. });
  286. token.Register(d.Dispose);
  287. return tcs.Task;
  288. }
  289. [Fact]
  290. public void CreateAsync_Cancel1()
  291. {
  292. RunSynchronously(() =>
  293. {
  294. var scheduler = new TestScheduler();
  295. var res = scheduler.Start(() =>
  296. Observable.Create<int>((observer, token) => Producer7(observer, token, scheduler)),
  297. 650
  298. );
  299. res.Messages.AssertEqual(
  300. OnNext(300, 1),
  301. OnNext(400, 2),
  302. OnNext(500, 3),
  303. OnNext(600, 4)
  304. );
  305. });
  306. }
  307. Task Producer8(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  308. {
  309. var tcs = new TaskCompletionSource<object>();
  310. var x = 0;
  311. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  312. {
  313. if (x == 4)
  314. {
  315. results.OnCompleted();
  316. }
  317. results.OnNext(++x);
  318. self(TimeSpan.FromTicks(100));
  319. });
  320. token.Register(d.Dispose);
  321. return tcs.Task;
  322. }
  323. [Fact]
  324. public void CreateAsync_Cancel2()
  325. {
  326. RunSynchronously(() =>
  327. {
  328. var scheduler = new TestScheduler();
  329. var res = scheduler.Start(() =>
  330. Observable.Create<int>((observer, token) => Producer8(observer, token, scheduler)),
  331. 650
  332. );
  333. res.Messages.AssertEqual(
  334. OnNext(300, 1),
  335. OnNext(400, 2),
  336. OnNext(500, 3),
  337. OnNext(600, 4)
  338. );
  339. });
  340. }
  341. Task Producer9(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  342. {
  343. var tcs = new TaskCompletionSource<object>();
  344. var x = 0;
  345. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  346. {
  347. if (x == 4)
  348. {
  349. results.OnCompleted();
  350. }
  351. results.OnNext(++x);
  352. self(TimeSpan.FromTicks(100));
  353. });
  354. token.Register(d.Dispose);
  355. return tcs.Task;
  356. }
  357. [Fact]
  358. public void CreateAsync_Cancel3()
  359. {
  360. RunSynchronously(() =>
  361. {
  362. var scheduler = new TestScheduler();
  363. var res = scheduler.Start(() =>
  364. Observable.Create<int>((observer, token) => Producer9(observer, token, scheduler)),
  365. 750
  366. );
  367. res.Messages.AssertEqual(
  368. OnNext(300, 1),
  369. OnNext(400, 2),
  370. OnNext(500, 3),
  371. OnNext(600, 4),
  372. OnCompleted<int>(700)
  373. );
  374. });
  375. }
  376. Task Producer10(IObserver<int> results, CancellationToken token, IScheduler scheduler)
  377. {
  378. var tcs = new TaskCompletionSource<object>();
  379. var x = 0;
  380. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  381. {
  382. if (x == 4)
  383. {
  384. tcs.SetCanceled();
  385. }
  386. results.OnNext(++x);
  387. self(TimeSpan.FromTicks(100));
  388. });
  389. token.Register(d.Dispose);
  390. return tcs.Task;
  391. }
  392. [Fact]
  393. public void CreateAsync_Cancel4()
  394. {
  395. RunSynchronously(() =>
  396. {
  397. var scheduler = new TestScheduler();
  398. var res = scheduler.Start(() =>
  399. Observable.Create<int>((observer, token) => Producer10(observer, token, scheduler))
  400. );
  401. res.Messages.Take(4).AssertEqual(
  402. OnNext(300, 1),
  403. OnNext(400, 2),
  404. OnNext(500, 3),
  405. OnNext(600, 4)
  406. );
  407. Assert.Equal(5, res.Messages.Count);
  408. Assert.Equal(700, res.Messages[4].Time);
  409. Assert.Equal(NotificationKind.OnError, res.Messages[4].Value.Kind);
  410. Assert.True(res.Messages[4].Value.Exception is OperationCanceledException);
  411. });
  412. }
  413. void RunSynchronously(Action action)
  414. {
  415. var t = new Task(action);
  416. t.RunSynchronously(new SynchronousScheduler());
  417. t.Wait();
  418. }
  419. class SynchronousScheduler : TaskScheduler
  420. {
  421. protected override IEnumerable<Task> GetScheduledTasks()
  422. {
  423. throw new NotImplementedException();
  424. }
  425. protected override void QueueTask(Task task)
  426. {
  427. TryExecuteTask(task);
  428. }
  429. protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  430. {
  431. return TryExecuteTask(task);
  432. }
  433. }
  434. [Fact]
  435. public void CreateAsync_Task_Simple()
  436. {
  437. var xs = Observable.Create<int>(observer =>
  438. {
  439. return Task.Factory.StartNew(() =>
  440. {
  441. observer.OnNext(42);
  442. observer.OnCompleted();
  443. });
  444. });
  445. var lst = new List<int>();
  446. xs.ForEach(lst.Add);
  447. Assert.True(new[] { 42 }.SequenceEqual(lst));
  448. }
  449. [Fact]
  450. public void CreateAsync_Task_Token()
  451. {
  452. var e = new ManualResetEvent(false);
  453. var xs = Observable.Create<int>((observer, ct) =>
  454. {
  455. return Task.Factory.StartNew(() =>
  456. {
  457. var i = 0;
  458. while (!ct.IsCancellationRequested)
  459. {
  460. if (i++ == 10)
  461. e.Set();
  462. observer.OnNext(42);
  463. }
  464. });
  465. });
  466. var lst = new List<int>();
  467. var d = xs.Subscribe(lst.Add);
  468. e.WaitOne();
  469. d.Dispose();
  470. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  471. }
  472. [Fact]
  473. public void CreateAsync_IDisposable_Simple()
  474. {
  475. var stopped = new ManualResetEvent(false);
  476. var s = Disposable.Create(() => stopped.Set());
  477. var xs = Observable.Create<int>(observer =>
  478. {
  479. return Task.Factory.StartNew(() =>
  480. {
  481. observer.OnNext(42);
  482. observer.OnCompleted();
  483. return s;
  484. });
  485. });
  486. var lst = new List<int>();
  487. xs.ForEach(lst.Add);
  488. stopped.WaitOne();
  489. Assert.True(new[] { 42 }.SequenceEqual(lst));
  490. }
  491. [Fact]
  492. public void CreateAsync_IDisposable_Token()
  493. {
  494. var stopped = new ManualResetEvent(false);
  495. var s = Disposable.Create(() => stopped.Set());
  496. var e = new ManualResetEvent(false);
  497. var xs = Observable.Create<int>((observer, ct) =>
  498. {
  499. return Task.Factory.StartNew(() =>
  500. {
  501. var i = 0;
  502. while (!ct.IsCancellationRequested)
  503. {
  504. if (i++ == 10)
  505. e.Set();
  506. observer.OnNext(42);
  507. }
  508. return s;
  509. });
  510. });
  511. var lst = new List<int>();
  512. var d = xs.Subscribe(lst.Add);
  513. e.WaitOne();
  514. d.Dispose();
  515. stopped.WaitOne();
  516. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  517. }
  518. [Fact]
  519. public void CreateAsync_Action_Simple()
  520. {
  521. var stopped = new ManualResetEvent(false);
  522. var s = new Action(() => stopped.Set());
  523. var xs = Observable.Create<int>(observer =>
  524. {
  525. return Task.Factory.StartNew(() =>
  526. {
  527. observer.OnNext(42);
  528. observer.OnCompleted();
  529. return s;
  530. });
  531. });
  532. var lst = new List<int>();
  533. xs.ForEach(lst.Add);
  534. stopped.WaitOne();
  535. Assert.True(new[] { 42 }.SequenceEqual(lst));
  536. }
  537. [Fact]
  538. public void CreateAsync_Action_Token()
  539. {
  540. var stopped = new ManualResetEvent(false);
  541. var s = new Action(() => stopped.Set());
  542. var e = new ManualResetEvent(false);
  543. var xs = Observable.Create<int>((observer, ct) =>
  544. {
  545. return Task.Factory.StartNew(() =>
  546. {
  547. var i = 0;
  548. while (!ct.IsCancellationRequested)
  549. {
  550. if (i++ == 10)
  551. e.Set();
  552. observer.OnNext(42);
  553. }
  554. return s;
  555. });
  556. });
  557. var lst = new List<int>();
  558. var d = xs.Subscribe(lst.Add);
  559. e.WaitOne();
  560. d.Dispose();
  561. stopped.WaitOne();
  562. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  563. }
  564. }
  565. }