AsyncEnumerable.Multiple.cs 29 KB


  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. using System;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Threading.Tasks;
  6. using System.Threading;
  7. namespace System.Linq
  8. {
  9. public static partial class AsyncEnumerable
  10. {
  11. public static IAsyncEnumerable<TSource> Concat<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  12. {
  13. if (first == null)
  14. throw new ArgumentNullException("first");
  15. if (second == null)
  16. throw new ArgumentNullException("second");
  17. return Create(() =>
  18. {
  19. var switched = false;
  20. var e = first.GetEnumerator();
  21. var cts = new CancellationTokenDisposable();
  22. var a = new AssignableDisposable { Disposable = e };
  23. var d = new CompositeDisposable(cts, a);
  24. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  25. f = (tcs, ct) => e.MoveNext(ct).ContinueWith(t =>
  26. {
  27. t.Handle(tcs, res =>
  28. {
  29. if (res)
  30. {
  31. tcs.TrySetResult(true);
  32. }
  33. else
  34. {
  35. if (switched)
  36. {
  37. tcs.TrySetResult(false);
  38. }
  39. else
  40. {
  41. switched = true;
  42. e = second.GetEnumerator();
  43. a.Disposable = e;
  44. f(tcs, ct);
  45. }
  46. }
  47. });
  48. });
  49. return Create(
  50. (ct, tcs) =>
  51. {
  52. f(tcs, cts.Token);
  53. return tcs.Task.UsingEnumerator(a);
  54. },
  55. () => e.Current,
  56. d.Dispose
  57. );
  58. });
  59. }
  60. public static IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> selector)
  61. {
  62. if (first == null)
  63. throw new ArgumentNullException("first");
  64. if (second == null)
  65. throw new ArgumentNullException("second");
  66. if (selector == null)
  67. throw new ArgumentNullException("selector");
  68. return Create(() =>
  69. {
  70. var e1 = first.GetEnumerator();
  71. var e2 = second.GetEnumerator();
  72. var current = default(TResult);
  73. var cts = new CancellationTokenDisposable();
  74. var d = new CompositeDisposable(cts, e1, e2);
  75. return Create(
  76. (ct, tcs) =>
  77. {
  78. e1.MoveNext(cts.Token).Zip(e2.MoveNext(cts.Token), (f, s) =>
  79. {
  80. var result = f && s;
  81. if (result)
  82. current = selector(e1.Current, e2.Current);
  83. return result;
  84. }).ContinueWith(t =>
  85. {
  86. t.Handle(tcs, x => tcs.TrySetResult(x));
  87. });
  88. return tcs.Task.UsingEnumerator(e1).UsingEnumerator(e2);
  89. },
  90. () => current,
  91. d.Dispose
  92. );
  93. });
  94. }
  95. public static IAsyncEnumerable<TSource> Except<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  96. {
  97. if (first == null)
  98. throw new ArgumentNullException("first");
  99. if (second == null)
  100. throw new ArgumentNullException("second");
  101. if (comparer == null)
  102. throw new ArgumentNullException("comparer");
  103. return Create(() =>
  104. {
  105. var e = first.GetEnumerator();
  106. var cts = new CancellationTokenDisposable();
  107. var d = new CompositeDisposable(cts, e);
  108. var mapTask = default(Task<Dictionary<TSource, TSource>>);
  109. var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(ct =>
  110. {
  111. if (mapTask == null)
  112. mapTask = second.ToDictionary(x => x, comparer, ct);
  113. return mapTask;
  114. });
  115. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  116. f = (tcs, ct) =>
  117. {
  118. e.MoveNext(ct).Zip(getMapTask(ct), (b, _) => b).ContinueWith(t =>
  119. {
  120. t.Handle(tcs, res =>
  121. {
  122. if (res)
  123. {
  124. if (!mapTask.Result.ContainsKey(e.Current))
  125. tcs.TrySetResult(true);
  126. else
  127. f(tcs, ct);
  128. }
  129. else
  130. tcs.TrySetResult(false);
  131. });
  132. });
  133. };
  134. return Create(
  135. (ct, tcs) =>
  136. {
  137. f(tcs, cts.Token);
  138. return tcs.Task.UsingEnumerator(e);
  139. },
  140. () => e.Current,
  141. d.Dispose
  142. );
  143. });
  144. }
  145. public static IAsyncEnumerable<TSource> Except<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  146. {
  147. if (first == null)
  148. throw new ArgumentNullException("first");
  149. if (second == null)
  150. throw new ArgumentNullException("second");
  151. return first.Except(second, EqualityComparer<TSource>.Default);
  152. }
  153. public static IAsyncEnumerable<TSource> Intersect<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  154. {
  155. if (first == null)
  156. throw new ArgumentNullException("first");
  157. if (second == null)
  158. throw new ArgumentNullException("second");
  159. if (comparer == null)
  160. throw new ArgumentNullException("comparer");
  161. return Create(() =>
  162. {
  163. var e = first.GetEnumerator();
  164. var cts = new CancellationTokenDisposable();
  165. var d = new CompositeDisposable(cts, e);
  166. var mapTask = default(Task<Dictionary<TSource, TSource>>);
  167. var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(ct =>
  168. {
  169. if (mapTask == null)
  170. mapTask = second.ToDictionary(x => x, comparer, ct);
  171. return mapTask;
  172. });
  173. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  174. f = (tcs, ct) =>
  175. {
  176. e.MoveNext(ct).Zip(getMapTask(ct), (b, _) => b).ContinueWith(t =>
  177. {
  178. t.Handle(tcs, res =>
  179. {
  180. if (res)
  181. {
  182. if (mapTask.Result.ContainsKey(e.Current))
  183. tcs.TrySetResult(true);
  184. else
  185. f(tcs, ct);
  186. }
  187. else
  188. tcs.TrySetResult(false);
  189. });
  190. });
  191. };
  192. return Create(
  193. (ct, tcs) =>
  194. {
  195. f(tcs, cts.Token);
  196. return tcs.Task.UsingEnumerator(e);
  197. },
  198. () => e.Current,
  199. d.Dispose
  200. );
  201. });
  202. }
  203. public static IAsyncEnumerable<TSource> Intersect<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  204. {
  205. if (first == null)
  206. throw new ArgumentNullException("first");
  207. if (second == null)
  208. throw new ArgumentNullException("second");
  209. return first.Intersect(second, EqualityComparer<TSource>.Default);
  210. }
  211. public static IAsyncEnumerable<TSource> Union<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
  212. {
  213. if (first == null)
  214. throw new ArgumentNullException("first");
  215. if (second == null)
  216. throw new ArgumentNullException("second");
  217. if (comparer == null)
  218. throw new ArgumentNullException("comparer");
  219. return first.Concat(second).Distinct(comparer);
  220. }
  221. public static IAsyncEnumerable<TSource> Union<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
  222. {
  223. if (first == null)
  224. throw new ArgumentNullException("first");
  225. if (second == null)
  226. throw new ArgumentNullException("second");
  227. return first.Union(second, EqualityComparer<TSource>.Default);
  228. }
  229. public static Task<bool> SequenceEqual<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer, CancellationToken cancellationToken)
  230. {
  231. if (first == null)
  232. throw new ArgumentNullException("first");
  233. if (second == null)
  234. throw new ArgumentNullException("second");
  235. if (comparer == null)
  236. throw new ArgumentNullException("comparer");
  237. var tcs = new TaskCompletionSource<bool>();
  238. var e1 = first.GetEnumerator();
  239. var e2 = second.GetEnumerator();
  240. var run = default(Action<CancellationToken>);
  241. run = ct =>
  242. {
  243. e1.MoveNext(ct).Zip(e2.MoveNext(ct), (f, s) =>
  244. {
  245. if (f ^ s)
  246. {
  247. tcs.TrySetResult(false);
  248. return false;
  249. }
  250. if (f && s)
  251. {
  252. var eq = default(bool);
  253. try
  254. {
  255. eq = comparer.Equals(e1.Current, e2.Current);
  256. }
  257. catch (Exception ex)
  258. {
  259. tcs.TrySetException(ex);
  260. return false;
  261. }
  262. if (!eq)
  263. {
  264. tcs.TrySetResult(false);
  265. return false;
  266. }
  267. else
  268. return true;
  269. }
  270. else
  271. {
  272. tcs.TrySetResult(true);
  273. return false;
  274. }
  275. }).ContinueWith(t =>
  276. {
  277. t.Handle(tcs, res =>
  278. {
  279. if (res)
  280. run(ct);
  281. });
  282. });
  283. };
  284. run(cancellationToken);
  285. return tcs.Task.Finally(() =>
  286. {
  287. e1.Dispose();
  288. e2.Dispose();
  289. });
  290. }
  291. public static Task<bool> SequenceEqual<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, CancellationToken cancellationToken)
  292. {
  293. if (first == null)
  294. throw new ArgumentNullException("first");
  295. if (second == null)
  296. throw new ArgumentNullException("second");
  297. return first.SequenceEqual(second, EqualityComparer<TSource>.Default, cancellationToken);
  298. }
  299. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  300. {
  301. if (outer == null)
  302. throw new ArgumentNullException("outer");
  303. if (inner == null)
  304. throw new ArgumentNullException("inner");
  305. if (outerKeySelector == null)
  306. throw new ArgumentNullException("outerKeySelector");
  307. if (innerKeySelector == null)
  308. throw new ArgumentNullException("innerKeySelector");
  309. if (resultSelector == null)
  310. throw new ArgumentNullException("resultSelector");
  311. if (comparer == null)
  312. throw new ArgumentNullException("comparer");
  313. return Create(() =>
  314. {
  315. var innerMap = default(Task<ILookup<TKey, TInner>>);
  316. var getInnerMap = new Func<CancellationToken, Task<ILookup<TKey, TInner>>>(ct =>
  317. {
  318. if (innerMap == null)
  319. innerMap = inner.ToLookup(innerKeySelector, comparer, ct);
  320. return innerMap;
  321. });
  322. var outerE = outer.GetEnumerator();
  323. var current = default(TResult);
  324. var cts = new CancellationTokenDisposable();
  325. var d = new CompositeDisposable(cts, outerE);
  326. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  327. f = (tcs, ct) =>
  328. {
  329. getInnerMap(ct).ContinueWith(ti =>
  330. {
  331. ti.Handle(tcs, map =>
  332. {
  333. outerE.MoveNext(ct).ContinueWith(to =>
  334. {
  335. to.Handle(tcs, res =>
  336. {
  337. if (res)
  338. {
  339. var element = outerE.Current;
  340. var key = default(TKey);
  341. try
  342. {
  343. key = outerKeySelector(element);
  344. }
  345. catch (Exception ex)
  346. {
  347. tcs.TrySetException(ex);
  348. return;
  349. }
  350. var innerE = default(IAsyncEnumerable<TInner>);
  351. if (!map.Contains(key))
  352. innerE = AsyncEnumerable.Empty<TInner>();
  353. else
  354. innerE = map[key].ToAsyncEnumerable();
  355. try
  356. {
  357. current = resultSelector(element, innerE);
  358. }
  359. catch (Exception ex)
  360. {
  361. tcs.TrySetException(ex);
  362. return;
  363. }
  364. tcs.TrySetResult(true);
  365. }
  366. else
  367. {
  368. tcs.TrySetResult(false);
  369. }
  370. });
  371. });
  372. });
  373. });
  374. };
  375. return Create(
  376. (ct, tcs) =>
  377. {
  378. f(tcs, cts.Token);
  379. return tcs.Task.UsingEnumerator(outerE);
  380. },
  381. () => current,
  382. d.Dispose
  383. );
  384. });
  385. }
  386. public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector)
  387. {
  388. if (outer == null)
  389. throw new ArgumentNullException("outer");
  390. if (inner == null)
  391. throw new ArgumentNullException("inner");
  392. if (outerKeySelector == null)
  393. throw new ArgumentNullException("outerKeySelector");
  394. if (innerKeySelector == null)
  395. throw new ArgumentNullException("innerKeySelector");
  396. if (resultSelector == null)
  397. throw new ArgumentNullException("resultSelector");
  398. return outer.GroupJoin(inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
  399. }
  400. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
  401. {
  402. if (outer == null)
  403. throw new ArgumentNullException("outer");
  404. if (inner == null)
  405. throw new ArgumentNullException("inner");
  406. if (outerKeySelector == null)
  407. throw new ArgumentNullException("outerKeySelector");
  408. if (innerKeySelector == null)
  409. throw new ArgumentNullException("innerKeySelector");
  410. if (resultSelector == null)
  411. throw new ArgumentNullException("resultSelector");
  412. if (comparer == null)
  413. throw new ArgumentNullException("comparer");
  414. return Create(() =>
  415. {
  416. var oe = outer.GetEnumerator();
  417. var ie = inner.GetEnumerator();
  418. var cts = new CancellationTokenDisposable();
  419. var d = new CompositeDisposable(cts, oe, ie);
  420. var current = default(TResult);
  421. var useOuter = true;
  422. var outerMap = new Dictionary<TKey, List<TOuter>>(comparer);
  423. var innerMap = new Dictionary<TKey, List<TInner>>(comparer);
  424. var q = new Queue<TResult>();
  425. var gate = new object();
  426. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  427. f = (tcs, ct) =>
  428. {
  429. if (q.Count > 0)
  430. {
  431. current = q.Dequeue();
  432. tcs.TrySetResult(true);
  433. return;
  434. }
  435. var b = useOuter;
  436. if (ie == null && oe == null)
  437. {
  438. tcs.TrySetResult(false);
  439. return;
  440. }
  441. else if (ie == null)
  442. b = true;
  443. else if (oe == null)
  444. b = false;
  445. useOuter = !useOuter;
  446. var enqueue = new Func<TOuter, TInner, bool>((o, i) =>
  447. {
  448. var result = default(TResult);
  449. try
  450. {
  451. result = resultSelector(o, i);
  452. }
  453. catch (Exception exception)
  454. {
  455. tcs.TrySetException(exception);
  456. return false;
  457. }
  458. q.Enqueue(result);
  459. return true;
  460. });
  461. if (b)
  462. oe.MoveNext(ct).ContinueWith(t =>
  463. {
  464. t.Handle(tcs, res =>
  465. {
  466. if (res)
  467. {
  468. var element = oe.Current;
  469. var key = default(TKey);
  470. try
  471. {
  472. key = outerKeySelector(element);
  473. }
  474. catch (Exception exception)
  475. {
  476. tcs.TrySetException(exception);
  477. return;
  478. }
  479. var outerList = default(List<TOuter>);
  480. if (!outerMap.TryGetValue(key, out outerList))
  481. {
  482. outerList = new List<TOuter>();
  483. outerMap.Add(key, outerList);
  484. }
  485. outerList.Add(element);
  486. var innerList = default(List<TInner>);
  487. if (!innerMap.TryGetValue(key, out innerList))
  488. {
  489. innerList = new List<TInner>();
  490. innerMap.Add(key, innerList);
  491. }
  492. foreach (var v in innerList)
  493. {
  494. if (!enqueue(element, v))
  495. return;
  496. }
  497. f(tcs, ct);
  498. }
  499. else
  500. {
  501. oe.Dispose();
  502. oe = null;
  503. f(tcs, ct);
  504. }
  505. });
  506. });
  507. else
  508. ie.MoveNext(ct).ContinueWith(t =>
  509. {
  510. t.Handle(tcs, res =>
  511. {
  512. if (res)
  513. {
  514. var element = ie.Current;
  515. var key = default(TKey);
  516. try
  517. {
  518. key = innerKeySelector(element);
  519. }
  520. catch (Exception exception)
  521. {
  522. tcs.TrySetException(exception);
  523. return;
  524. }
  525. var innerList = default(List<TInner>);
  526. if (!innerMap.TryGetValue(key, out innerList))
  527. {
  528. innerList = new List<TInner>();
  529. innerMap.Add(key, innerList);
  530. }
  531. innerList.Add(element);
  532. var outerList = default(List<TOuter>);
  533. if (!outerMap.TryGetValue(key, out outerList))
  534. {
  535. outerList = new List<TOuter>();
  536. outerMap.Add(key, outerList);
  537. }
  538. foreach (var v in outerList)
  539. {
  540. if (!enqueue(v, element))
  541. return;
  542. }
  543. f(tcs, ct);
  544. }
  545. else
  546. {
  547. ie.Dispose();
  548. ie = null;
  549. f(tcs, ct);
  550. }
  551. });
  552. });
  553. };
  554. return Create(
  555. (ct, tcs) =>
  556. {
  557. f(tcs, cts.Token);
  558. return tcs.Task.UsingEnumerator(oe).UsingEnumerator(ie);
  559. },
  560. () => current,
  561. d.Dispose
  562. );
  563. });
  564. }
  565. public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector)
  566. {
  567. if (outer == null)
  568. throw new ArgumentNullException("outer");
  569. if (inner == null)
  570. throw new ArgumentNullException("inner");
  571. if (outerKeySelector == null)
  572. throw new ArgumentNullException("outerKeySelector");
  573. if (innerKeySelector == null)
  574. throw new ArgumentNullException("innerKeySelector");
  575. if (resultSelector == null)
  576. throw new ArgumentNullException("resultSelector");
  577. return outer.Join(inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
  578. }
  579. public static IAsyncEnumerable<TSource> Concat<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  580. {
  581. if (sources == null)
  582. throw new ArgumentNullException("sources");
  583. return sources.Concat_();
  584. }
  585. public static IAsyncEnumerable<TSource> Concat<TSource>(params IAsyncEnumerable<TSource>[] sources)
  586. {
  587. if (sources == null)
  588. throw new ArgumentNullException("sources");
  589. return sources.Concat_();
  590. }
  591. private static IAsyncEnumerable<TSource> Concat_<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
  592. {
  593. return Create(() =>
  594. {
  595. var se = sources.GetEnumerator();
  596. var e = default(IAsyncEnumerator<TSource>);
  597. var cts = new CancellationTokenDisposable();
  598. var a = new AssignableDisposable();
  599. var d = new CompositeDisposable(cts, se, a);
  600. var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
  601. f = (tcs, ct) =>
  602. {
  603. if (e == null)
  604. {
  605. var b = false;
  606. try
  607. {
  608. b = se.MoveNext();
  609. if (b)
  610. e = se.Current.GetEnumerator();
  611. }
  612. catch (Exception ex)
  613. {
  614. tcs.TrySetException(ex);
  615. return;
  616. }
  617. if (!b)
  618. {
  619. tcs.TrySetResult(false);
  620. return;
  621. }
  622. a.Disposable = e;
  623. }
  624. e.MoveNext(ct).ContinueWith(t =>
  625. {
  626. t.Handle(tcs, res =>
  627. {
  628. if (res)
  629. {
  630. tcs.TrySetResult(true);
  631. }
  632. else
  633. {
  634. e.Dispose();
  635. e = null;
  636. f(tcs, ct);
  637. }
  638. });
  639. });
  640. };
  641. return Create(
  642. (ct, tcs) =>
  643. {
  644. f(tcs, cts.Token);
  645. return tcs.Task.UsingEnumerator(a);
  646. },
  647. () => e.Current,
  648. d.Dispose
  649. );
  650. });
  651. }
  652. public static IAsyncEnumerable<TOther> SelectMany<TSource, TOther>(this IAsyncEnumerable<TSource> source, IAsyncEnumerable<TOther> other)
  653. {
  654. if (source == null)
  655. throw new ArgumentNullException("source");
  656. if (other == null)
  657. throw new ArgumentNullException("other");
  658. return source.SelectMany(_ => other);
  659. }
  660. }
  661. }