FromAsyncTest.cs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Linq;
  7. using System.Text;
  8. using System.Threading.Tasks;
  9. using System.Reactive;
  10. using System.Reactive.Concurrency;
  11. using System.Reactive.Linq;
  12. using Microsoft.Reactive.Testing;
  13. using Xunit;
  14. using ReactiveTests.Dummies;
  15. using System.Threading;
  16. namespace ReactiveTests.Tests
  17. {
  18. public class FromAsyncTest : ReactiveTest
  19. {
  20. private Task<int> doneTask;
  21. public FromAsyncTest()
  22. {
  23. var tcs = new TaskCompletionSource<int>();
  24. tcs.SetResult(42);
  25. doneTask = tcs.Task;
  26. }
  27. #region Func
  28. [Fact]
  29. public void FromAsync_Func_ArgumentChecking()
  30. {
  31. var s = Scheduler.Immediate;
  32. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<Task<int>>)));
  33. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<CancellationToken, Task<int>>)));
  34. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<Task<int>>), s));
  35. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(() => doneTask, default(IScheduler)));
  36. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(default(Func<CancellationToken, Task<int>>), s));
  37. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync<int>(ct => doneTask, default(IScheduler)));
  38. }
  39. [Fact]
  40. public void FromAsync_Func_Success()
  41. {
  42. var n = 42;
  43. var i = 0;
  44. var xs = Observable.FromAsync(() =>
  45. {
  46. i++;
  47. return Task.Factory.StartNew(() => n);
  48. });
  49. Assert.Equal(n, xs.Single());
  50. Assert.Equal(1, i);
  51. Assert.Equal(n, xs.Single());
  52. Assert.Equal(2, i);
  53. }
  54. [Fact]
  55. public void FromAsync_Func_Throw_Synchronous()
  56. {
  57. var ex = new Exception();
  58. var xs = Observable.FromAsync<int>(() =>
  59. {
  60. throw ex;
  61. });
  62. ReactiveAssert.Throws(ex, () => xs.Single());
  63. }
  64. [Fact]
  65. public void FromAsync_Func_Throw_Asynchronous()
  66. {
  67. var ex = new Exception();
  68. var xs = Observable.FromAsync<int>(() =>
  69. Task.Factory.StartNew<int>(() => { throw ex; })
  70. );
  71. ReactiveAssert.Throws(ex, () => xs.Single());
  72. }
  73. [Fact]
  74. public void FromAsync_FuncWithCancel_Success()
  75. {
  76. var n = 42;
  77. var i = 0;
  78. var xs = Observable.FromAsync(ct =>
  79. {
  80. i++;
  81. return Task.Factory.StartNew(() => n);
  82. });
  83. Assert.Equal(n, xs.Single());
  84. Assert.Equal(1, i);
  85. Assert.Equal(n, xs.Single());
  86. Assert.Equal(2, i);
  87. }
  88. [Fact]
  89. public void FromAsync_FuncWithCancel_Throw_Synchronous()
  90. {
  91. var ex = new Exception();
  92. var xs = Observable.FromAsync<int>(ct =>
  93. {
  94. throw ex;
  95. });
  96. ReactiveAssert.Throws(ex, () => xs.Single());
  97. }
  98. [Fact]
  99. public void FromAsync_FuncWithCancel_Throw_Asynchronous()
  100. {
  101. var ex = new Exception();
  102. var xs = Observable.FromAsync<int>(ct =>
  103. Task.Factory.StartNew<int>(() => { throw ex; })
  104. );
  105. ReactiveAssert.Throws(ex, () => xs.Single());
  106. }
  107. [Fact]
  108. public void FromAsync_FuncWithCancel_Cancel()
  109. {
  110. var e = new ManualResetEvent(false);
  111. var f = new ManualResetEvent(false);
  112. var t = default(Task<int>);
  113. var xs = Observable.FromAsync(ct =>
  114. t = Task.Factory.StartNew<int>(() =>
  115. {
  116. try
  117. {
  118. e.Set();
  119. while (true)
  120. ct.ThrowIfCancellationRequested();
  121. }
  122. finally
  123. {
  124. f.Set();
  125. }
  126. })
  127. );
  128. var d = xs.Subscribe(_ => { });
  129. e.WaitOne();
  130. d.Dispose();
  131. f.WaitOne();
  132. while (!t.IsCompleted)
  133. ;
  134. }
  135. #if DESKTOPCLR
  136. [Fact]
  137. public void FromAsync_Func_Scheduler1()
  138. {
  139. var e = new ManualResetEvent(false);
  140. var x = default(int);
  141. var t = default(int);
  142. var tcs = new TaskCompletionSource<int>();
  143. var xs = Observable.FromAsync(() => tcs.Task, Scheduler.Immediate);
  144. xs.Subscribe(res =>
  145. {
  146. x = res;
  147. t = Thread.CurrentThread.ManagedThreadId;
  148. e.Set();
  149. });
  150. tcs.SetResult(42);
  151. e.WaitOne();
  152. Assert.Equal(42, x);
  153. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  154. }
  155. [Fact]
  156. public void FromAsync_Func_Scheduler2()
  157. {
  158. var e = new ManualResetEvent(false);
  159. var x = default(int);
  160. var t = default(int);
  161. var tcs = new TaskCompletionSource<int>();
  162. var xs = Observable.FromAsync(ct => tcs.Task, Scheduler.Immediate);
  163. xs.Subscribe(res =>
  164. {
  165. x = res;
  166. t = Thread.CurrentThread.ManagedThreadId;
  167. e.Set();
  168. });
  169. tcs.SetResult(42);
  170. e.WaitOne();
  171. Assert.Equal(42, x);
  172. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  173. }
  174. #endif
  175. #endregion
  176. #region Action
  177. [Fact]
  178. public void FromAsync_Action_ArgumentChecking()
  179. {
  180. var s = Scheduler.Immediate;
  181. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>)));
  182. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>)));
  183. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<Task>), s));
  184. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(() => (Task)doneTask, default(IScheduler)));
  185. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(default(Func<CancellationToken, Task>), s));
  186. ReactiveAssert.Throws<ArgumentNullException>(() => Observable.FromAsync(ct => (Task)doneTask, default(IScheduler)));
  187. }
  188. [Fact]
  189. public void FromAsync_Action_Success()
  190. {
  191. var i = 0;
  192. var xs = Observable.FromAsync(() =>
  193. {
  194. i++;
  195. return Task.Factory.StartNew(() => { });
  196. });
  197. Assert.Equal(Unit.Default, xs.Single());
  198. Assert.Equal(1, i);
  199. Assert.Equal(Unit.Default, xs.Single());
  200. Assert.Equal(2, i);
  201. }
  202. [Fact]
  203. public void FromAsync_Action_Throw_Synchronous()
  204. {
  205. var ex = new Exception();
  206. var xs = Observable.FromAsync(() =>
  207. {
  208. throw ex;
  209. });
  210. ReactiveAssert.Throws(ex, () => xs.Single());
  211. }
  212. [Fact]
  213. public void FromAsync_Action_Throw_Asynchronous()
  214. {
  215. var ex = new Exception();
  216. var xs = Observable.FromAsync(() =>
  217. Task.Factory.StartNew(() => { throw ex; })
  218. );
  219. ReactiveAssert.Throws(ex, () => xs.Single());
  220. }
  221. [Fact]
  222. public void FromAsync_ActionWithCancel_Success()
  223. {
  224. var i = 0;
  225. var xs = Observable.FromAsync(ct =>
  226. {
  227. i++;
  228. return Task.Factory.StartNew(() => { });
  229. });
  230. Assert.Equal(Unit.Default, xs.Single());
  231. Assert.Equal(1, i);
  232. Assert.Equal(Unit.Default, xs.Single());
  233. Assert.Equal(2, i);
  234. }
  235. [Fact]
  236. public void FromAsync_ActionWithCancel_Throw_Synchronous()
  237. {
  238. var ex = new Exception();
  239. var xs = Observable.FromAsync(ct =>
  240. {
  241. throw ex;
  242. });
  243. ReactiveAssert.Throws(ex, () => xs.Single());
  244. }
  245. [Fact]
  246. public void FromAsync_ActionWithCancel_Throw_Asynchronous()
  247. {
  248. var ex = new Exception();
  249. var xs = Observable.FromAsync(ct =>
  250. Task.Factory.StartNew(() => { throw ex; })
  251. );
  252. ReactiveAssert.Throws(ex, () => xs.Single());
  253. }
  254. [Fact]
  255. public void FromAsync_ActionWithCancel_Cancel()
  256. {
  257. var e = new ManualResetEvent(false);
  258. var f = new ManualResetEvent(false);
  259. var t = default(Task);
  260. var xs = Observable.FromAsync(ct =>
  261. t = Task.Factory.StartNew(() =>
  262. {
  263. try
  264. {
  265. e.Set();
  266. while (true)
  267. ct.ThrowIfCancellationRequested();
  268. }
  269. finally
  270. {
  271. f.Set();
  272. }
  273. })
  274. );
  275. var d = xs.Subscribe(_ => { });
  276. e.WaitOne();
  277. d.Dispose();
  278. f.WaitOne();
  279. while (!t.IsCompleted)
  280. ;
  281. }
  282. #if DESKTOPCLR
  283. [Fact]
  284. public void FromAsync_Action_Scheduler1()
  285. {
  286. var e = new ManualResetEvent(false);
  287. var t = default(int);
  288. var tcs = new TaskCompletionSource<int>();
  289. var xs = Observable.FromAsync(() => (Task)tcs.Task, Scheduler.Immediate);
  290. xs.Subscribe(res =>
  291. {
  292. t = Thread.CurrentThread.ManagedThreadId;
  293. e.Set();
  294. });
  295. tcs.SetResult(42);
  296. e.WaitOne();
  297. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  298. }
  299. [Fact]
  300. public void FromAsync_Action_Scheduler2()
  301. {
  302. var e = new ManualResetEvent(false);
  303. var t = default(int);
  304. var tcs = new TaskCompletionSource<int>();
  305. var xs = Observable.FromAsync(ct => (Task)tcs.Task, Scheduler.Immediate);
  306. xs.Subscribe(res =>
  307. {
  308. t = Thread.CurrentThread.ManagedThreadId;
  309. e.Set();
  310. });
  311. tcs.SetResult(42);
  312. e.WaitOne();
  313. Assert.Equal(Thread.CurrentThread.ManagedThreadId, t);
  314. }
  315. #endif
  316. #endregion
  317. }
  318. }