123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Threading.Tasks;
- using System.Threading;
- namespace System.Linq
- {
- public static partial class AsyncEnumerable
- {
- public static IAsyncEnumerable<TSource> Concat<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- return Create(() =>
- {
- var switched = false;
- var e = first.GetEnumerator();
- var cts = new CancellationTokenDisposable();
- var a = new AssignableDisposable { Disposable = e };
- var d = new CompositeDisposable(cts, a);
- var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
- f = (tcs, ct) => e.MoveNext(ct).ContinueWith(t =>
- {
- t.Handle(tcs, res =>
- {
- if (res)
- {
- tcs.TrySetResult(true);
- }
- else
- {
- if (switched)
- {
- tcs.TrySetResult(false);
- }
- else
- {
- switched = true;
- e = second.GetEnumerator();
- a.Disposable = e;
- f(tcs, ct);
- }
- }
- });
- });
- return Create(
- (ct, tcs) =>
- {
- f(tcs, cts.Token);
- return tcs.Task.UsingEnumerator(a);
- },
- () => e.Current,
- d.Dispose
- );
- });
- }
- public static IAsyncEnumerable<TResult> Zip<TFirst, TSecond, TResult>(this IAsyncEnumerable<TFirst> first, IAsyncEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> selector)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- if (selector == null)
- throw new ArgumentNullException("selector");
- return Create(() =>
- {
- var e1 = first.GetEnumerator();
- var e2 = second.GetEnumerator();
- var current = default(TResult);
- var cts = new CancellationTokenDisposable();
- var d = new CompositeDisposable(cts, e1, e2);
- return Create(
- (ct, tcs) =>
- {
- e1.MoveNext(cts.Token).Zip(e2.MoveNext(cts.Token), (f, s) =>
- {
- var result = f && s;
- if (result)
- current = selector(e1.Current, e2.Current);
- return result;
- }).ContinueWith(t =>
- {
- t.Handle(tcs, x => tcs.TrySetResult(x));
- });
- return tcs.Task.UsingEnumerator(e1).UsingEnumerator(e2);
- },
- () => current,
- d.Dispose
- );
- });
- }
- public static IAsyncEnumerable<TSource> Except<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- if (comparer == null)
- throw new ArgumentNullException("comparer");
- return Create(() =>
- {
- var e = first.GetEnumerator();
- var cts = new CancellationTokenDisposable();
- var d = new CompositeDisposable(cts, e);
- var mapTask = default(Task<Dictionary<TSource, TSource>>);
- var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(ct =>
- {
- if (mapTask == null)
- mapTask = second.ToDictionary(x => x, comparer, ct);
- return mapTask;
- });
- var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
- f = (tcs, ct) =>
- {
- e.MoveNext(ct).Zip(getMapTask(ct), (b, _) => b).ContinueWith(t =>
- {
- t.Handle(tcs, res =>
- {
- if (res)
- {
- if (!mapTask.Result.ContainsKey(e.Current))
- tcs.TrySetResult(true);
- else
- f(tcs, ct);
- }
- else
- tcs.TrySetResult(false);
- });
- });
- };
- return Create(
- (ct, tcs) =>
- {
- f(tcs, cts.Token);
- return tcs.Task.UsingEnumerator(e);
- },
- () => e.Current,
- d.Dispose
- );
- });
- }
- public static IAsyncEnumerable<TSource> Except<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- return first.Except(second, EqualityComparer<TSource>.Default);
- }
- public static IAsyncEnumerable<TSource> Intersect<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- if (comparer == null)
- throw new ArgumentNullException("comparer");
- return Create(() =>
- {
- var e = first.GetEnumerator();
- var cts = new CancellationTokenDisposable();
- var d = new CompositeDisposable(cts, e);
- var mapTask = default(Task<Dictionary<TSource, TSource>>);
- var getMapTask = new Func<CancellationToken, Task<Dictionary<TSource, TSource>>>(ct =>
- {
- if (mapTask == null)
- mapTask = second.ToDictionary(x => x, comparer, ct);
- return mapTask;
- });
- var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
- f = (tcs, ct) =>
- {
- e.MoveNext(ct).Zip(getMapTask(ct), (b, _) => b).ContinueWith(t =>
- {
- t.Handle(tcs, res =>
- {
- if (res)
- {
- if (mapTask.Result.ContainsKey(e.Current))
- tcs.TrySetResult(true);
- else
- f(tcs, ct);
- }
- else
- tcs.TrySetResult(false);
- });
- });
- };
- return Create(
- (ct, tcs) =>
- {
- f(tcs, cts.Token);
- return tcs.Task.UsingEnumerator(e);
- },
- () => e.Current,
- d.Dispose
- );
- });
- }
- public static IAsyncEnumerable<TSource> Intersect<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- return first.Intersect(second, EqualityComparer<TSource>.Default);
- }
- public static IAsyncEnumerable<TSource> Union<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- if (comparer == null)
- throw new ArgumentNullException("comparer");
- return first.Concat(second).Distinct(comparer);
- }
- public static IAsyncEnumerable<TSource> Union<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- return first.Union(second, EqualityComparer<TSource>.Default);
- }
- public static Task<bool> SequenceEqual<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, IEqualityComparer<TSource> comparer, CancellationToken cancellationToken)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- if (comparer == null)
- throw new ArgumentNullException("comparer");
- var tcs = new TaskCompletionSource<bool>();
- var e1 = first.GetEnumerator();
- var e2 = second.GetEnumerator();
- var run = default(Action<CancellationToken>);
- run = ct =>
- {
- e1.MoveNext(ct).Zip(e2.MoveNext(ct), (f, s) =>
- {
- if (f ^ s)
- {
- tcs.TrySetResult(false);
- return false;
- }
- if (f && s)
- {
- var eq = default(bool);
- try
- {
- eq = comparer.Equals(e1.Current, e2.Current);
- }
- catch (Exception ex)
- {
- tcs.TrySetException(ex);
- return false;
- }
- if (!eq)
- {
- tcs.TrySetResult(false);
- return false;
- }
- else
- return true;
- }
- else
- {
- tcs.TrySetResult(true);
- return false;
- }
- }).ContinueWith(t =>
- {
- t.Handle(tcs, res =>
- {
- if (res)
- run(ct);
- });
- });
- };
- run(cancellationToken);
- return tcs.Task.Finally(() =>
- {
- e1.Dispose();
- e2.Dispose();
- });
- }
- public static Task<bool> SequenceEqual<TSource>(this IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, CancellationToken cancellationToken)
- {
- if (first == null)
- throw new ArgumentNullException("first");
- if (second == null)
- throw new ArgumentNullException("second");
- return first.SequenceEqual(second, EqualityComparer<TSource>.Default, cancellationToken);
- }
- public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector, IEqualityComparer<TKey> comparer)
- {
- if (outer == null)
- throw new ArgumentNullException("outer");
- if (inner == null)
- throw new ArgumentNullException("inner");
- if (outerKeySelector == null)
- throw new ArgumentNullException("outerKeySelector");
- if (innerKeySelector == null)
- throw new ArgumentNullException("innerKeySelector");
- if (resultSelector == null)
- throw new ArgumentNullException("resultSelector");
- if (comparer == null)
- throw new ArgumentNullException("comparer");
- return Create(() =>
- {
- var innerMap = default(Task<ILookup<TKey, TInner>>);
- var getInnerMap = new Func<CancellationToken, Task<ILookup<TKey, TInner>>>(ct =>
- {
- if (innerMap == null)
- innerMap = inner.ToLookup(innerKeySelector, comparer, ct);
- return innerMap;
- });
- var outerE = outer.GetEnumerator();
- var current = default(TResult);
- var cts = new CancellationTokenDisposable();
- var d = new CompositeDisposable(cts, outerE);
- var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
- f = (tcs, ct) =>
- {
- getInnerMap(ct).ContinueWith(ti =>
- {
- ti.Handle(tcs, map =>
- {
- outerE.MoveNext(ct).ContinueWith(to =>
- {
- to.Handle(tcs, res =>
- {
- if (res)
- {
- var element = outerE.Current;
- var key = default(TKey);
- try
- {
- key = outerKeySelector(element);
- }
- catch (Exception ex)
- {
- tcs.TrySetException(ex);
- return;
- }
- var innerE = default(IAsyncEnumerable<TInner>);
- if (!map.Contains(key))
- innerE = AsyncEnumerable.Empty<TInner>();
- else
- innerE = map[key].ToAsyncEnumerable();
- try
- {
- current = resultSelector(element, innerE);
- }
- catch (Exception ex)
- {
- tcs.TrySetException(ex);
- return;
- }
- tcs.TrySetResult(true);
- }
- else
- {
- tcs.TrySetResult(false);
- }
- });
- });
- });
- });
- };
- return Create(
- (ct, tcs) =>
- {
- f(tcs, cts.Token);
- return tcs.Task.UsingEnumerator(outerE);
- },
- () => current,
- d.Dispose
- );
- });
- }
- public static IAsyncEnumerable<TResult> GroupJoin<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector)
- {
- if (outer == null)
- throw new ArgumentNullException("outer");
- if (inner == null)
- throw new ArgumentNullException("inner");
- if (outerKeySelector == null)
- throw new ArgumentNullException("outerKeySelector");
- if (innerKeySelector == null)
- throw new ArgumentNullException("innerKeySelector");
- if (resultSelector == null)
- throw new ArgumentNullException("resultSelector");
- return outer.GroupJoin(inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
- }
- public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector, IEqualityComparer<TKey> comparer)
- {
- if (outer == null)
- throw new ArgumentNullException("outer");
- if (inner == null)
- throw new ArgumentNullException("inner");
- if (outerKeySelector == null)
- throw new ArgumentNullException("outerKeySelector");
- if (innerKeySelector == null)
- throw new ArgumentNullException("innerKeySelector");
- if (resultSelector == null)
- throw new ArgumentNullException("resultSelector");
- if (comparer == null)
- throw new ArgumentNullException("comparer");
- return Create(() =>
- {
- var oe = outer.GetEnumerator();
- var ie = inner.GetEnumerator();
- var cts = new CancellationTokenDisposable();
- var d = new CompositeDisposable(cts, oe, ie);
- var current = default(TResult);
- var useOuter = true;
- var outerMap = new Dictionary<TKey, List<TOuter>>(comparer);
- var innerMap = new Dictionary<TKey, List<TInner>>(comparer);
- var q = new Queue<TResult>();
- var gate = new object();
- var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
- f = (tcs, ct) =>
- {
- if (q.Count > 0)
- {
- current = q.Dequeue();
- tcs.TrySetResult(true);
- return;
- }
- var b = useOuter;
- if (ie == null && oe == null)
- {
- tcs.TrySetResult(false);
- return;
- }
- else if (ie == null)
- b = true;
- else if (oe == null)
- b = false;
- useOuter = !useOuter;
- var enqueue = new Func<TOuter, TInner, bool>((o, i) =>
- {
- var result = default(TResult);
- try
- {
- result = resultSelector(o, i);
- }
- catch (Exception exception)
- {
- tcs.TrySetException(exception);
- return false;
- }
- q.Enqueue(result);
- return true;
- });
- if (b)
- oe.MoveNext(ct).ContinueWith(t =>
- {
- t.Handle(tcs, res =>
- {
- if (res)
- {
- var element = oe.Current;
- var key = default(TKey);
- try
- {
- key = outerKeySelector(element);
- }
- catch (Exception exception)
- {
- tcs.TrySetException(exception);
- return;
- }
- var outerList = default(List<TOuter>);
- if (!outerMap.TryGetValue(key, out outerList))
- {
- outerList = new List<TOuter>();
- outerMap.Add(key, outerList);
- }
- outerList.Add(element);
- var innerList = default(List<TInner>);
- if (!innerMap.TryGetValue(key, out innerList))
- {
- innerList = new List<TInner>();
- innerMap.Add(key, innerList);
- }
- foreach (var v in innerList)
- {
- if (!enqueue(element, v))
- return;
- }
-
- f(tcs, ct);
- }
- else
- {
- oe.Dispose();
- oe = null;
- f(tcs, ct);
- }
- });
- });
- else
- ie.MoveNext(ct).ContinueWith(t =>
- {
- t.Handle(tcs, res =>
- {
- if (res)
- {
- var element = ie.Current;
- var key = default(TKey);
- try
- {
- key = innerKeySelector(element);
- }
- catch (Exception exception)
- {
- tcs.TrySetException(exception);
- return;
- }
- var innerList = default(List<TInner>);
- if (!innerMap.TryGetValue(key, out innerList))
- {
- innerList = new List<TInner>();
- innerMap.Add(key, innerList);
- }
- innerList.Add(element);
- var outerList = default(List<TOuter>);
- if (!outerMap.TryGetValue(key, out outerList))
- {
- outerList = new List<TOuter>();
- outerMap.Add(key, outerList);
- }
- foreach (var v in outerList)
- {
- if (!enqueue(v, element))
- return;
- }
- f(tcs, ct);
- }
- else
- {
- ie.Dispose();
- ie = null;
- f(tcs, ct);
- }
- });
- });
- };
- return Create(
- (ct, tcs) =>
- {
- f(tcs, cts.Token);
- return tcs.Task.UsingEnumerator(oe).UsingEnumerator(ie);
- },
- () => current,
- d.Dispose
- );
- });
- }
- public static IAsyncEnumerable<TResult> Join<TOuter, TInner, TKey, TResult>(this IAsyncEnumerable<TOuter> outer, IAsyncEnumerable<TInner> inner, Func<TOuter, TKey> outerKeySelector, Func<TInner, TKey> innerKeySelector, Func<TOuter, TInner, TResult> resultSelector)
- {
- if (outer == null)
- throw new ArgumentNullException("outer");
- if (inner == null)
- throw new ArgumentNullException("inner");
- if (outerKeySelector == null)
- throw new ArgumentNullException("outerKeySelector");
- if (innerKeySelector == null)
- throw new ArgumentNullException("innerKeySelector");
- if (resultSelector == null)
- throw new ArgumentNullException("resultSelector");
- return outer.Join(inner, outerKeySelector, innerKeySelector, resultSelector, EqualityComparer<TKey>.Default);
- }
- public static IAsyncEnumerable<TSource> Concat<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
- {
- if (sources == null)
- throw new ArgumentNullException("sources");
- return sources.Concat_();
- }
- public static IAsyncEnumerable<TSource> Concat<TSource>(params IAsyncEnumerable<TSource>[] sources)
- {
- if (sources == null)
- throw new ArgumentNullException("sources");
- return sources.Concat_();
- }
- private static IAsyncEnumerable<TSource> Concat_<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
- {
- return Create(() =>
- {
- var se = sources.GetEnumerator();
- var e = default(IAsyncEnumerator<TSource>);
- var cts = new CancellationTokenDisposable();
- var a = new AssignableDisposable();
- var d = new CompositeDisposable(cts, se, a);
- var f = default(Action<TaskCompletionSource<bool>, CancellationToken>);
- f = (tcs, ct) =>
- {
- if (e == null)
- {
- var b = false;
- try
- {
- b = se.MoveNext();
- if (b)
- e = se.Current.GetEnumerator();
- }
- catch (Exception ex)
- {
- tcs.TrySetException(ex);
- return;
- }
- if (!b)
- {
- tcs.TrySetResult(false);
- return;
- }
- a.Disposable = e;
- }
- e.MoveNext(ct).ContinueWith(t =>
- {
- t.Handle(tcs, res =>
- {
- if (res)
- {
- tcs.TrySetResult(true);
- }
- else
- {
- e.Dispose();
- e = null;
- f(tcs, ct);
- }
- });
- });
- };
- return Create(
- (ct, tcs) =>
- {
- f(tcs, cts.Token);
- return tcs.Task.UsingEnumerator(a);
- },
- () => e.Current,
- d.Dispose
- );
- });
- }
- public static IAsyncEnumerable<TOther> SelectMany<TSource, TOther>(this IAsyncEnumerable<TSource> source, IAsyncEnumerable<TOther> other)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- if (other == null)
- throw new ArgumentNullException("other");
- return source.SelectMany(_ => other);
- }
- }
- }
|