1
0

CreateAsyncTest.cs 22 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Reactive;
  8. using System.Reactive.Concurrency;
  9. using System.Reactive.Disposables;
  10. using System.Reactive.Linq;
  11. using System.Threading;
  12. using System.Threading.Tasks;
  13. using Microsoft.Reactive.Testing;
  14. using Microsoft.VisualStudio.TestTools.UnitTesting;
  15. using Assert = Xunit.Assert;
  16. namespace ReactiveTests.Tests
  17. {
  18. [TestClass]
  19. public class CreateAsyncTest : ReactiveTest
  20. {
  21. [TestMethod]
  22. public void CreateAsync_ArgumentChecking()
  23. {
  24. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, Task>)));
  25. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, CancellationToken, Task>)));
  26. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, Task<IDisposable>>)));
  27. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, CancellationToken, Task<IDisposable>>)));
  28. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, Task<Action>>)));
  29. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Create(default(Func<IObserver<int>, CancellationToken, Task<Action>>)));
  30. }
  31. [TestMethod]
  32. public void CreateAsync_NullCoalescingAction1()
  33. {
  34. var xs = Observable.Create<int>(o =>
  35. {
  36. o.OnNext(42);
  37. return Task.Factory.StartNew(() => default(Action));
  38. });
  39. var lst = new List<int>();
  40. var d = xs.Subscribe(lst.Add);
  41. d.Dispose();
  42. Assert.True(lst.SequenceEqual([42]));
  43. }
  44. [TestMethod]
  45. public void CreateAsync_NullCoalescingAction2()
  46. {
  47. var xs = Observable.Create<int>((o, ct) =>
  48. {
  49. o.OnNext(42);
  50. return Task.Factory.StartNew(() => default(Action));
  51. });
  52. var lst = new List<int>();
  53. var d = xs.Subscribe(lst.Add);
  54. d.Dispose();
  55. Assert.True(lst.SequenceEqual([42]));
  56. }
  57. [TestMethod]
  58. public void CreateAsync_NullCoalescingDisposable1()
  59. {
  60. var xs = Observable.Create<int>(o =>
  61. {
  62. o.OnNext(42);
  63. return Task.Factory.StartNew(() => default(IDisposable));
  64. });
  65. var lst = new List<int>();
  66. var d = xs.Subscribe(lst.Add);
  67. d.Dispose();
  68. Assert.True(lst.SequenceEqual([42]));
  69. }
  70. [TestMethod]
  71. public void CreateAsync_NullCoalescingDisposable2()
  72. {
  73. var xs = Observable.Create<int>((o, ct) =>
  74. {
  75. o.OnNext(42);
  76. return Task.Factory.StartNew(() => default(IDisposable));
  77. });
  78. var lst = new List<int>();
  79. var d = xs.Subscribe(lst.Add);
  80. d.Dispose();
  81. Assert.True(lst.SequenceEqual([42]));
  82. }
  83. private Task Producer1(IObserver<int> results, IScheduler scheduler, CancellationToken token)
  84. {
  85. var tcs = new TaskCompletionSource<object>();
  86. var x = 0;
  87. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  88. {
  89. results.OnNext(++x);
  90. self(TimeSpan.FromTicks(100));
  91. });
  92. token.Register(d.Dispose);
  93. return tcs.Task;
  94. }
  95. [TestMethod]
  96. public void CreateAsync_Never()
  97. {
  98. RunSynchronously(() =>
  99. {
  100. var scheduler = new TestScheduler();
  101. var res = scheduler.Start(() =>
  102. Observable.Create<int>((observer, token) => Producer1(observer, scheduler, token))
  103. );
  104. res.Messages.AssertEqual(
  105. OnNext(300, 1),
  106. OnNext(400, 2),
  107. OnNext(500, 3),
  108. OnNext(600, 4),
  109. OnNext(700, 5),
  110. OnNext(800, 6),
  111. OnNext(900, 7)
  112. );
  113. });
  114. }
  115. private Task Producer2(IObserver<int> results, IScheduler scheduler, CancellationToken token)
  116. {
  117. var tcs = new TaskCompletionSource<object>();
  118. var x = 0;
  119. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  120. {
  121. if (x == 4)
  122. {
  123. tcs.SetResult(null);
  124. }
  125. results.OnNext(++x);
  126. self(TimeSpan.FromTicks(100));
  127. });
  128. token.Register(d.Dispose);
  129. return tcs.Task;
  130. }
  131. [TestMethod]
  132. public void CreateAsync_Completed1()
  133. {
  134. RunSynchronously(() =>
  135. {
  136. var scheduler = new TestScheduler();
  137. var res = scheduler.Start(() =>
  138. Observable.Create<int>((observer, token) => Producer2(observer, scheduler, token))
  139. );
  140. res.Messages.AssertEqual(
  141. OnNext(300, 1),
  142. OnNext(400, 2),
  143. OnNext(500, 3),
  144. OnNext(600, 4),
  145. OnCompleted<int>(700)
  146. );
  147. });
  148. }
  149. private Task Producer3(IObserver<int> results, IScheduler scheduler, CancellationToken token)
  150. {
  151. var tcs = new TaskCompletionSource<object>();
  152. var x = 0;
  153. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  154. {
  155. if (x == 4)
  156. {
  157. results.OnCompleted();
  158. }
  159. results.OnNext(++x);
  160. self(TimeSpan.FromTicks(100));
  161. });
  162. token.Register(d.Dispose);
  163. return tcs.Task;
  164. }
  165. [TestMethod]
  166. public void CreateAsync_Completed2()
  167. {
  168. RunSynchronously(() =>
  169. {
  170. var scheduler = new TestScheduler();
  171. var res = scheduler.Start(() =>
  172. Observable.Create<int>((observer, token) => Producer3(observer, scheduler, token))
  173. );
  174. res.Messages.AssertEqual(
  175. OnNext(300, 1),
  176. OnNext(400, 2),
  177. OnNext(500, 3),
  178. OnNext(600, 4),
  179. OnCompleted<int>(700)
  180. );
  181. });
  182. }
  183. private Task Producer4(IObserver<int> results, IScheduler scheduler, Exception exception, CancellationToken token)
  184. {
  185. var tcs = new TaskCompletionSource<object>();
  186. var x = 0;
  187. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  188. {
  189. if (x == 4)
  190. {
  191. results.OnError(exception);
  192. }
  193. results.OnNext(++x);
  194. self(TimeSpan.FromTicks(100));
  195. });
  196. token.Register(d.Dispose);
  197. return tcs.Task;
  198. }
  199. [TestMethod]
  200. public void CreateAsync_Error1()
  201. {
  202. RunSynchronously(() =>
  203. {
  204. var scheduler = new TestScheduler();
  205. var exception = new Exception();
  206. var res = scheduler.Start(() =>
  207. Observable.Create<int>((observer, token) => Producer4(observer, scheduler, exception, token))
  208. );
  209. res.Messages.AssertEqual(
  210. OnNext(300, 1),
  211. OnNext(400, 2),
  212. OnNext(500, 3),
  213. OnNext(600, 4),
  214. OnError<int>(700, exception)
  215. );
  216. });
  217. }
  218. private Task Producer5(IObserver<int> results, IScheduler scheduler, Exception exception, CancellationToken token)
  219. {
  220. var tcs = new TaskCompletionSource<object>();
  221. var x = 0;
  222. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  223. {
  224. if (x == 4)
  225. {
  226. tcs.SetException(exception);
  227. }
  228. results.OnNext(++x);
  229. self(TimeSpan.FromTicks(100));
  230. });
  231. token.Register(d.Dispose);
  232. return tcs.Task;
  233. }
  234. [TestMethod]
  235. public void CreateAsync_Error2()
  236. {
  237. RunSynchronously(() =>
  238. {
  239. var scheduler = new TestScheduler();
  240. var exception = new Exception();
  241. var res = scheduler.Start(() =>
  242. Observable.Create<int>((observer, token) => Producer5(observer, scheduler, exception, token))
  243. );
  244. res.Messages.AssertEqual(
  245. OnNext(300, 1),
  246. OnNext(400, 2),
  247. OnNext(500, 3),
  248. OnNext(600, 4),
  249. OnError<int>(700, exception)
  250. );
  251. });
  252. }
  253. private Task Producer6(IObserver<int> results, Exception exception, CancellationToken token)
  254. {
  255. throw exception;
  256. }
  257. [TestMethod]
  258. public void CreateAsync_Error3()
  259. {
  260. RunSynchronously(() =>
  261. {
  262. var scheduler = new TestScheduler();
  263. var exception = new InvalidOperationException();
  264. var res = scheduler.Start(() =>
  265. Observable.Create<int>((observer, token) => Producer6(observer, exception, token))
  266. );
  267. res.Messages.AssertEqual(
  268. OnError<int>(200, exception)
  269. );
  270. });
  271. }
  272. private Task Producer7(IObserver<int> results, IScheduler scheduler, CancellationToken token)
  273. {
  274. var tcs = new TaskCompletionSource<object>();
  275. var x = 0;
  276. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  277. {
  278. if (x == 4)
  279. {
  280. tcs.SetResult(null);
  281. }
  282. results.OnNext(++x);
  283. self(TimeSpan.FromTicks(100));
  284. });
  285. token.Register(d.Dispose);
  286. return tcs.Task;
  287. }
  288. [TestMethod]
  289. public void CreateAsync_Cancel1()
  290. {
  291. RunSynchronously(() =>
  292. {
  293. var scheduler = new TestScheduler();
  294. var res = scheduler.Start(() =>
  295. Observable.Create<int>((observer, token) => Producer7(observer, scheduler, token)),
  296. 650
  297. );
  298. res.Messages.AssertEqual(
  299. OnNext(300, 1),
  300. OnNext(400, 2),
  301. OnNext(500, 3),
  302. OnNext(600, 4)
  303. );
  304. });
  305. }
  306. private Task Producer8(IObserver<int> results, IScheduler scheduler, CancellationToken token)
  307. {
  308. var tcs = new TaskCompletionSource<object>();
  309. var x = 0;
  310. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  311. {
  312. if (x == 4)
  313. {
  314. results.OnCompleted();
  315. }
  316. results.OnNext(++x);
  317. self(TimeSpan.FromTicks(100));
  318. });
  319. token.Register(d.Dispose);
  320. return tcs.Task;
  321. }
  322. [TestMethod]
  323. public void CreateAsync_Cancel2()
  324. {
  325. RunSynchronously(() =>
  326. {
  327. var scheduler = new TestScheduler();
  328. var res = scheduler.Start(() =>
  329. Observable.Create<int>((observer, token) => Producer8(observer, scheduler, token)),
  330. 650
  331. );
  332. res.Messages.AssertEqual(
  333. OnNext(300, 1),
  334. OnNext(400, 2),
  335. OnNext(500, 3),
  336. OnNext(600, 4)
  337. );
  338. });
  339. }
  340. private Task Producer9(IObserver<int> results, IScheduler scheduler, CancellationToken token)
  341. {
  342. var tcs = new TaskCompletionSource<object>();
  343. var x = 0;
  344. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  345. {
  346. if (x == 4)
  347. {
  348. results.OnCompleted();
  349. }
  350. results.OnNext(++x);
  351. self(TimeSpan.FromTicks(100));
  352. });
  353. token.Register(d.Dispose);
  354. return tcs.Task;
  355. }
  356. [TestMethod]
  357. public void CreateAsync_Cancel3()
  358. {
  359. RunSynchronously(() =>
  360. {
  361. var scheduler = new TestScheduler();
  362. var res = scheduler.Start(() =>
  363. Observable.Create<int>((observer, token) => Producer9(observer, scheduler, token)),
  364. 750
  365. );
  366. res.Messages.AssertEqual(
  367. OnNext(300, 1),
  368. OnNext(400, 2),
  369. OnNext(500, 3),
  370. OnNext(600, 4),
  371. OnCompleted<int>(700)
  372. );
  373. });
  374. }
  375. private Task Producer10(IObserver<int> results, IScheduler scheduler, CancellationToken token)
  376. {
  377. var tcs = new TaskCompletionSource<object>();
  378. var x = 0;
  379. var d = scheduler.Schedule(TimeSpan.FromTicks(100), self =>
  380. {
  381. if (x == 4)
  382. {
  383. tcs.SetCanceled();
  384. }
  385. results.OnNext(++x);
  386. self(TimeSpan.FromTicks(100));
  387. });
  388. token.Register(d.Dispose);
  389. return tcs.Task;
  390. }
  391. [TestMethod]
  392. public void CreateAsync_Cancel4()
  393. {
  394. RunSynchronously(() =>
  395. {
  396. var scheduler = new TestScheduler();
  397. var res = scheduler.Start(() =>
  398. Observable.Create<int>((observer, token) => Producer10(observer, scheduler, token))
  399. );
  400. res.Messages.Take(4).AssertEqual(
  401. OnNext(300, 1),
  402. OnNext(400, 2),
  403. OnNext(500, 3),
  404. OnNext(600, 4)
  405. );
  406. Assert.Equal(5, res.Messages.Count);
  407. Assert.Equal(700, res.Messages[4].Time);
  408. Assert.Equal(NotificationKind.OnError, res.Messages[4].Value.Kind);
  409. Assert.True(res.Messages[4].Value.Exception is OperationCanceledException);
  410. });
  411. }
  412. private void RunSynchronously(Action action)
  413. {
  414. var t = new Task(action);
  415. t.RunSynchronously(new SynchronousScheduler());
  416. t.Wait();
  417. }
  418. private class SynchronousScheduler : TaskScheduler
  419. {
  420. protected override IEnumerable<Task> GetScheduledTasks()
  421. {
  422. throw new NotImplementedException();
  423. }
  424. protected override void QueueTask(Task task)
  425. {
  426. TryExecuteTask(task);
  427. }
  428. protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
  429. {
  430. return TryExecuteTask(task);
  431. }
  432. }
  433. [TestMethod]
  434. public void CreateAsync_Task_Simple()
  435. {
  436. var xs = Observable.Create<int>(observer =>
  437. {
  438. return Task.Factory.StartNew(() =>
  439. {
  440. observer.OnNext(42);
  441. observer.OnCompleted();
  442. });
  443. });
  444. var lst = new List<int>();
  445. xs.ForEach(lst.Add);
  446. Assert.True(new[] { 42 }.SequenceEqual(lst));
  447. }
  448. [TestMethod]
  449. public void CreateAsync_Task_Token()
  450. {
  451. var e = new ManualResetEvent(false);
  452. var xs = Observable.Create<int>((observer, ct) =>
  453. {
  454. return Task.Factory.StartNew(() =>
  455. {
  456. var i = 0;
  457. while (!ct.IsCancellationRequested)
  458. {
  459. if (i++ == 10)
  460. {
  461. e.Set();
  462. }
  463. observer.OnNext(42);
  464. }
  465. },
  466. CancellationToken.None);
  467. });
  468. var lst = new List<int>();
  469. var d = xs.Subscribe(i => { lock (lst) { lst.Add(i); } });
  470. e.WaitOne();
  471. d.Dispose();
  472. // Although Dispose will set the _isStopped gate in the AutoDetachObserver that our
  473. // observer gets wrapped in, it's possible that the thread we kicked off had just
  474. // made one of its calls to observer.OnNext, and that this had just got past the
  475. // _isStopped gate when we called Dispose, meaning that it might right now be inside
  476. // List<int>.Add. We're synchronizing access to the list to ensure that any such
  477. // call has completed by the time we try to inspect the list.
  478. lock (lst)
  479. {
  480. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  481. }
  482. }
  483. [TestMethod]
  484. public void CreateAsync_IDisposable_Simple()
  485. {
  486. var stopped = new ManualResetEvent(false);
  487. var s = Disposable.Create(() => stopped.Set());
  488. var xs = Observable.Create<int>(observer =>
  489. {
  490. return Task.Factory.StartNew(() =>
  491. {
  492. observer.OnNext(42);
  493. observer.OnCompleted();
  494. return s;
  495. });
  496. });
  497. var lst = new List<int>();
  498. xs.ForEach(lst.Add);
  499. stopped.WaitOne();
  500. Assert.True(new[] { 42 }.SequenceEqual(lst));
  501. }
  502. [TestMethod]
  503. public void CreateAsync_IDisposable_Token()
  504. {
  505. var stopped = new ManualResetEvent(false);
  506. var s = Disposable.Create(() => stopped.Set());
  507. var e = new ManualResetEvent(false);
  508. var xs = Observable.Create<int>((observer, ct) =>
  509. {
  510. return Task.Factory.StartNew(() =>
  511. {
  512. var i = 0;
  513. while (!ct.IsCancellationRequested)
  514. {
  515. if (i++ == 10)
  516. {
  517. e.Set();
  518. }
  519. observer.OnNext(42);
  520. }
  521. return s;
  522. });
  523. });
  524. var lst = new List<int>();
  525. var d = xs.Subscribe(lst.Add);
  526. e.WaitOne();
  527. d.Dispose();
  528. stopped.WaitOne();
  529. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  530. }
  531. [TestMethod]
  532. public void CreateAsync_Action_Simple()
  533. {
  534. var stopped = new ManualResetEvent(false);
  535. var s = new Action(() => stopped.Set());
  536. var xs = Observable.Create<int>(observer =>
  537. {
  538. return Task.Factory.StartNew(() =>
  539. {
  540. observer.OnNext(42);
  541. observer.OnCompleted();
  542. return s;
  543. });
  544. });
  545. var lst = new List<int>();
  546. xs.ForEach(lst.Add);
  547. stopped.WaitOne();
  548. Assert.True(new[] { 42 }.SequenceEqual(lst));
  549. }
  550. [TestMethod]
  551. public void CreateAsync_Action_Token()
  552. {
  553. var stopped = new ManualResetEvent(false);
  554. var s = new Action(() => stopped.Set());
  555. var e = new ManualResetEvent(false);
  556. var xs = Observable.Create<int>((observer, ct) =>
  557. {
  558. return Task.Factory.StartNew(() =>
  559. {
  560. var i = 0;
  561. while (!ct.IsCancellationRequested)
  562. {
  563. if (i++ == 10)
  564. {
  565. e.Set();
  566. }
  567. observer.OnNext(42);
  568. }
  569. return s;
  570. });
  571. });
  572. var lst = new List<int>();
  573. var d = xs.Subscribe(lst.Add);
  574. e.WaitOne();
  575. d.Dispose();
  576. stopped.WaitOne();
  577. Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
  578. }
  579. [TestMethod]
  580. public void CreateWithTaskDisposable_NoPrematureTermination()
  581. {
  582. var obs = Observable.Create<int>(async o =>
  583. {
  584. // avoid warning on async o due to no await
  585. await Task.CompletedTask;
  586. var inner = Observable.Range(1, 3);
  587. return inner.Subscribe(x =>
  588. {
  589. o.OnNext(x);
  590. });
  591. });
  592. var result = obs.Take(1).Wait();
  593. }
  594. [TestMethod]
  595. public void CreateWithTaskAction_NoPrematureTermination()
  596. {
  597. var obs = Observable.Create<int>(async o =>
  598. {
  599. // avoid warning on async o due to no await
  600. await Task.CompletedTask;
  601. var inner = Observable.Range(1, 3);
  602. var d = inner.Subscribe(x =>
  603. {
  604. o.OnNext(x);
  605. });
  606. #pragma warning disable IDE0039 // (Use local function.) We are testing for a returned Action, and want to be explicit about that.
  607. Action a = () => d.Dispose();
  608. #pragma warning restore IDE0039
  609. return a;
  610. });
  611. var result = obs.Take(1).Wait();
  612. }
  613. }
  614. }