1
0

AsyncTests.Conversions.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System;
  5. using System.Collections.Generic;
  6. using System.Collections.ObjectModel;
  7. using System.Linq;
  8. using System.Text;
  9. using System.Threading.Tasks;
  10. using Xunit;
  11. using System.Threading;
  12. namespace Tests
  13. {
  14. public partial class AsyncTests
  15. {
  16. [Fact]
  17. public void ToAsyncEnumerable_Null()
  18. {
  19. AssertThrows<ArgumentNullException>(() => AsyncEnumerable.ToAsyncEnumerable<int>(default(IEnumerable<int>)));
  20. AssertThrows<ArgumentNullException>(() => AsyncEnumerable.ToAsyncEnumerable<int>(default(IObservable<int>)));
  21. AssertThrows<ArgumentNullException>(() => AsyncEnumerable.ToAsyncEnumerable<int>(default(Task<int>)));
  22. }
  23. [Fact]
  24. public void ToAsyncEnumerable1()
  25. {
  26. var xs = new[] { 1, 2, 3, 4 }.ToAsyncEnumerable();
  27. var e = xs.GetEnumerator();
  28. HasNext(e, 1);
  29. HasNext(e, 2);
  30. HasNext(e, 3);
  31. HasNext(e, 4);
  32. NoNext(e);
  33. }
  34. [Fact]
  35. public void ToAsyncEnumerable2()
  36. {
  37. var ex = new Exception("Bang");
  38. var xs = ToAsyncEnumerable_Sequence(ex).ToAsyncEnumerable();
  39. var e = xs.GetEnumerator();
  40. HasNext(e, 42);
  41. AssertThrows<Exception>(() => e.MoveNext().Wait(WaitTimeoutMs), ex_ => ((AggregateException)ex_).InnerExceptions.Single() == ex);
  42. }
  43. private IEnumerable<int> ToAsyncEnumerable_Sequence(Exception e)
  44. {
  45. yield return 42;
  46. throw e;
  47. }
  48. [Fact]
  49. public void ToAsyncEnumerable3()
  50. {
  51. var subscribed = false;
  52. var xs = new MyObservable<int>(obs =>
  53. {
  54. subscribed = true;
  55. obs.OnNext(42);
  56. obs.OnCompleted();
  57. return new MyDisposable(() => { });
  58. }).ToAsyncEnumerable();
  59. Assert.False(subscribed);
  60. var e = xs.GetEnumerator();
  61. Assert.True(subscribed);
  62. HasNext(e, 42);
  63. NoNext(e);
  64. }
  65. [Fact]
  66. public void ToAsyncEnumerable4()
  67. {
  68. var ex = new Exception("Bang!");
  69. var subscribed = false;
  70. var xs = new MyObservable<int>(obs =>
  71. {
  72. subscribed = true;
  73. obs.OnError(ex);
  74. return new MyDisposable(() => { });
  75. }).ToAsyncEnumerable();
  76. Assert.False(subscribed);
  77. var e = xs.GetEnumerator();
  78. Assert.True(subscribed);
  79. AssertThrows<Exception>(() => e.MoveNext().Wait(WaitTimeoutMs), ex_ => ((AggregateException)ex_).InnerExceptions.Single() == ex);
  80. }
  81. [Fact]
  82. public void ToAsyncEnumerable5()
  83. {
  84. var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
  85. var xs = set.ToAsyncEnumerable();
  86. var e = xs.GetEnumerator();
  87. HasNext(e, 1);
  88. HasNext(e, 2);
  89. HasNext(e, 3);
  90. HasNext(e, 4);
  91. NoNext(e);
  92. }
  93. [Fact]
  94. public async Task ToAsyncEnumerable6()
  95. {
  96. var set = new HashSet<int>(new[] { 1, 2, 3, 4, 5, 6, 7, 8 });
  97. var xs = set.ToAsyncEnumerable();
  98. var arr = await xs.ToArray();
  99. Assert.True(set.SetEquals(arr));
  100. }
  101. [Fact]
  102. public async Task ToAsyncEnumerable7()
  103. {
  104. var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
  105. var xs = set.ToAsyncEnumerable();
  106. var arr = await xs.ToList();
  107. Assert.True(set.SetEquals(arr));
  108. }
  109. [Fact]
  110. public async Task ToAsyncEnumerable8()
  111. {
  112. var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
  113. var xs = set.ToAsyncEnumerable();
  114. var c = await xs.Count();
  115. Assert.Equal(set.Count, c);
  116. }
  117. [Fact]
  118. public async Task ToAsyncEnumerable9()
  119. {
  120. var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
  121. var xs = set.ToAsyncEnumerable();
  122. await SequenceIdentity(xs);
  123. }
  124. [Fact]
  125. public async Task ToAsyncEnumerable10()
  126. {
  127. var xs = new[] { 1, 2, 3, 4 }.ToAsyncEnumerable();
  128. await SequenceIdentity(xs);
  129. }
  130. [Fact]
  131. public void ToAsyncEnumerabl11()
  132. {
  133. var set = new HashSet<int>(new[] { 1, 2, 3, 4 });
  134. var xs = set.ToAsyncEnumerable();
  135. var xc = xs as ICollection<int>;
  136. Assert.NotNull(xc);
  137. Assert.False(xc.IsReadOnly);
  138. xc.Add(5);
  139. Assert.True(xc.Contains(5));
  140. Assert.True(xc.Remove(5));
  141. var arr = new int[4];
  142. xc.CopyTo(arr, 0);
  143. Assert.True(arr.SequenceEqual(xc));
  144. xc.Clear();
  145. Assert.Equal(0, xc.Count);
  146. }
  147. [Fact]
  148. public void ToAsyncEnumerabl12()
  149. {
  150. var set = new List<int> { 1, 2, 3, 4 };
  151. var xs = set.ToAsyncEnumerable();
  152. var xl = xs as IList<int>;
  153. Assert.NotNull(xl);
  154. Assert.False(xl.IsReadOnly);
  155. xl.Add(5);
  156. Assert.True(xl.Contains(5));
  157. Assert.True(xl.Remove(5));
  158. xl.Insert(2, 10);
  159. Assert.Equal(2, xl.IndexOf(10));
  160. xl.RemoveAt(2);
  161. xl[0] = 7;
  162. Assert.Equal(7, xl[0]);
  163. var arr = new int[4];
  164. xl.CopyTo(arr, 0);
  165. Assert.True(arr.SequenceEqual(xl));
  166. xl.Clear();
  167. Assert.Equal(0, xl.Count);
  168. }
  169. [Fact]
  170. public void ToAsyncEnumerable_With_Completed_Task()
  171. {
  172. var task = Task.Factory.StartNew(() => 36);
  173. var xs = task.ToAsyncEnumerable();
  174. var e = xs.GetEnumerator();
  175. Assert.True(e.MoveNext().Result);
  176. Assert.Equal(36, e.Current);
  177. Assert.False(e.MoveNext().Result);
  178. }
  179. [Fact]
  180. public void ToAsyncEnumerable_With_Faulted_Task()
  181. {
  182. var ex = new InvalidOperationException();
  183. var tcs = new TaskCompletionSource<int>();
  184. tcs.SetException(ex);
  185. var xs = tcs.Task.ToAsyncEnumerable();
  186. var e = xs.GetEnumerator();
  187. AssertThrows<Exception>(() => e.MoveNext().Wait(WaitTimeoutMs), ex_ => ((AggregateException)ex_).InnerExceptions.Single() == ex);
  188. }
  189. [Fact]
  190. public void ToAsyncEnumerable_With_Canceled_Task()
  191. {
  192. var tcs = new TaskCompletionSource<int>();
  193. tcs.SetCanceled();
  194. var xs = tcs.Task.ToAsyncEnumerable();
  195. var e = xs.GetEnumerator();
  196. AssertThrows<Exception>(() => e.MoveNext().Wait(WaitTimeoutMs), ex_ => ((AggregateException)ex_).InnerExceptions.Single() is TaskCanceledException);
  197. }
  198. class MyObservable<T> : IObservable<T>
  199. {
  200. private Func<IObserver<T>, IDisposable> _subscribe;
  201. public MyObservable(Func<IObserver<T>, IDisposable> subscribe)
  202. {
  203. _subscribe = subscribe;
  204. }
  205. public IDisposable Subscribe(IObserver<T> observer)
  206. {
  207. return _subscribe(observer);
  208. }
  209. }
  210. class MyDisposable : IDisposable
  211. {
  212. private Action _dispose;
  213. public MyDisposable(Action dispose)
  214. {
  215. _dispose = dispose;
  216. }
  217. public void Dispose()
  218. {
  219. _dispose();
  220. }
  221. }
  222. [Fact]
  223. public void ToEnumerable_Null()
  224. {
  225. AssertThrows<ArgumentNullException>(() => AsyncEnumerable.ToEnumerable<int>(null));
  226. }
  227. [Fact]
  228. public void ToEnumerable1()
  229. {
  230. var xs = AsyncEnumerable.Return(42).ToEnumerable();
  231. Assert.True(xs.SequenceEqual(new[] { 42 }));
  232. }
  233. [Fact]
  234. public void ToEnumerable2()
  235. {
  236. var xs = AsyncEnumerable.Empty<int>().ToEnumerable();
  237. Assert.True(xs.SequenceEqual(new int[0]));
  238. }
  239. [Fact]
  240. public void ToEnumerable3()
  241. {
  242. var ex = new Exception("Bang");
  243. var xs = AsyncEnumerable.Throw<int>(ex).ToEnumerable();
  244. AssertThrows<Exception>(() => xs.GetEnumerator().MoveNext(), ex_ => ((AggregateException)ex_).InnerExceptions.Single() == ex);
  245. }
  246. [Fact]
  247. public void ToObservable_Null()
  248. {
  249. AssertThrows<ArgumentNullException>(() => AsyncEnumerable.ToObservable<int>(null));
  250. }
  251. [Fact]
  252. public void ToObservable1()
  253. {
  254. var fail = false;
  255. var evt = new ManualResetEvent(false);
  256. var xs = AsyncEnumerable.Empty<int>().ToObservable();
  257. xs.Subscribe(new MyObserver<int>(
  258. x =>
  259. {
  260. fail = true;
  261. },
  262. ex =>
  263. {
  264. fail = true;
  265. evt.Set();
  266. },
  267. () =>
  268. {
  269. evt.Set();
  270. }
  271. ));
  272. evt.WaitOne();
  273. Assert.False(fail);
  274. }
  275. [Fact]
  276. public void ToObservable2()
  277. {
  278. var lst = new List<int>();
  279. var fail = false;
  280. var evt = new ManualResetEvent(false);
  281. var xs = AsyncEnumerable.Return(42).ToObservable();
  282. xs.Subscribe(new MyObserver<int>(
  283. x =>
  284. {
  285. lst.Add(x);
  286. },
  287. ex =>
  288. {
  289. fail = true;
  290. evt.Set();
  291. },
  292. () =>
  293. {
  294. evt.Set();
  295. }
  296. ));
  297. evt.WaitOne();
  298. Assert.False(fail);
  299. Assert.True(lst.SequenceEqual(new[] { 42 }));
  300. }
  301. [Fact]
  302. public void ToObservable3()
  303. {
  304. var lst = new List<int>();
  305. var fail = false;
  306. var evt = new ManualResetEvent(false);
  307. var xs = AsyncEnumerable.Range(0, 10).ToObservable();
  308. xs.Subscribe(new MyObserver<int>(
  309. x =>
  310. {
  311. lst.Add(x);
  312. },
  313. ex =>
  314. {
  315. fail = true;
  316. evt.Set();
  317. },
  318. () =>
  319. {
  320. evt.Set();
  321. }
  322. ));
  323. evt.WaitOne();
  324. Assert.False(fail);
  325. Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10)));
  326. }
  327. [Fact]
  328. public void ToObservable4()
  329. {
  330. var ex1 = new Exception("Bang!");
  331. var ex_ = default(Exception);
  332. var fail = false;
  333. var evt = new ManualResetEvent(false);
  334. var xs = AsyncEnumerable.Throw<int>(ex1).ToObservable();
  335. xs.Subscribe(new MyObserver<int>(
  336. x =>
  337. {
  338. fail = true;
  339. },
  340. ex =>
  341. {
  342. ex_ = ex;
  343. evt.Set();
  344. },
  345. () =>
  346. {
  347. fail = true;
  348. evt.Set();
  349. }
  350. ));
  351. evt.WaitOne();
  352. Assert.False(fail);
  353. Assert.Equal(ex1, ((AggregateException)ex_).InnerExceptions.Single());
  354. }
  355. [Fact]
  356. public void ToObservable_disposes_enumerator_on_completion()
  357. {
  358. var fail = false;
  359. var evt = new ManualResetEvent(false);
  360. var ae = AsyncEnumerable.CreateEnumerable(
  361. () => AsyncEnumerable.CreateEnumerator<int>(
  362. ct => Task.FromResult(false),
  363. () => { throw new InvalidOperationException(); },
  364. () => { evt.Set(); }));
  365. ae
  366. .ToObservable()
  367. .Subscribe(new MyObserver<int>(
  368. x =>
  369. {
  370. fail = true;
  371. },
  372. ex =>
  373. {
  374. fail = true;
  375. },
  376. () =>
  377. {
  378. }
  379. ));
  380. evt.WaitOne();
  381. Assert.False(fail);
  382. }
  383. [Fact]
  384. public void ToObservable_disposes_enumerator_when_subscription_is_disposed()
  385. {
  386. var fail = false;
  387. var evt = new ManualResetEvent(false);
  388. var subscription = default(IDisposable);
  389. var subscriptionAssignedTcs = new TaskCompletionSource<object>();
  390. var ae = AsyncEnumerable.CreateEnumerable(
  391. () => AsyncEnumerable.CreateEnumerator(
  392. async ct =>
  393. {
  394. await subscriptionAssignedTcs.Task;
  395. return true;
  396. },
  397. () => 1,
  398. () => { evt.Set(); }));
  399. subscription = ae
  400. .ToObservable()
  401. .Subscribe(new MyObserver<int>(
  402. x =>
  403. {
  404. subscription.Dispose();
  405. },
  406. ex =>
  407. {
  408. fail = true;
  409. },
  410. () =>
  411. {
  412. fail = true;
  413. }
  414. ));
  415. subscriptionAssignedTcs.SetResult(null);
  416. evt.WaitOne();
  417. Assert.False(fail);
  418. }
  419. [Fact]
  420. public void ToObservable_does_not_call_MoveNext_again_when_subscription_is_disposed()
  421. {
  422. var fail = false;
  423. var moveNextCount = 0;
  424. var evt = new ManualResetEvent(false);
  425. var subscription = default(IDisposable);
  426. var subscriptionAssignedTcs = new TaskCompletionSource<object>();
  427. var ae = AsyncEnumerable.CreateEnumerable(
  428. () => AsyncEnumerable.CreateEnumerator(
  429. async ct =>
  430. {
  431. await subscriptionAssignedTcs.Task;
  432. moveNextCount++;
  433. return true;
  434. },
  435. () => 1,
  436. () => { evt.Set(); }));
  437. subscription = ae
  438. .ToObservable()
  439. .Subscribe(new MyObserver<int>(
  440. x =>
  441. {
  442. subscription.Dispose();
  443. },
  444. ex =>
  445. {
  446. fail = true;
  447. },
  448. () =>
  449. {
  450. fail = true;
  451. }
  452. ));
  453. subscriptionAssignedTcs.SetResult(null);
  454. evt.WaitOne();
  455. Assert.Equal(1, moveNextCount);
  456. Assert.False(fail);
  457. }
  458. class MyObserver<T> : IObserver<T>
  459. {
  460. private Action<T> _onNext;
  461. private Action<Exception> _onError;
  462. private Action _onCompleted;
  463. public MyObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  464. {
  465. _onNext = onNext;
  466. _onError = onError;
  467. _onCompleted = onCompleted;
  468. }
  469. public void OnCompleted()
  470. {
  471. _onCompleted();
  472. }
  473. public void OnError(Exception error)
  474. {
  475. _onError(error);
  476. }
  477. public void OnNext(T value)
  478. {
  479. _onNext(value);
  480. }
  481. }
  482. }
  483. }