| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System;using System.Collections.Generic;using System.Threading.Tasks;using System.Threading;namespace System.Linq{    public static partial class AsyncEnumerable    {        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IEnumerable<TSource> source)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            return Create(() =>            {                var e = source.GetEnumerator();                return Create(                    ct => Task.Run(() =>                    {                        var res = false;                        try                        {                            res = e.MoveNext();                        }                        finally                        {                            if (!res)                                e.Dispose();                        }                        return res;                    }, ct),                    () => e.Current,                    () => e.Dispose()                );            });        }        public static IEnumerable<TSource> ToEnumerable<TSource>(this IAsyncEnumerable<TSource> source)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            return ToEnumerable_(source);        }        private static IEnumerable<TSource> ToEnumerable_<TSource>(IAsyncEnumerable<TSource> source)        {            using (var e = source.GetEnumerator())            {                while (true)                {                    if (!e.MoveNext(CancellationToken.None).Result)                        break;                    var c = e.Current;                    yield return c;                }            }        }        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this Task<TSource> task)        {            if (task == null)                throw new ArgumentNullException(nameof(task));                        return Create(() =>            {                var called = 0;                var value = default(TSource);                return Create(                    async ct =>                    {                        if (Interlocked.CompareExchange(ref called, 1, 0) == 0)                        {                            value = await task.ConfigureAwait(false);                            return true;                        }                        return false;                    },                    () => value,                    () => { });            });        }        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            return Create(() =>            {                var observer = new ToAsyncEnumerableObserver<TSource>();                var subscription = source.Subscribe(observer);                return Create(                    (ct, tcs) =>                    {                        var hasValue = false;                        var hasCompleted = false;                        var error = default(Exception);                        lock (observer.SyncRoot)                        {                            if (observer.Values.Count > 0)                            {                                hasValue = true;                                observer.Current = observer.Values.Dequeue();                            }                            else if (observer.HasCompleted)                            {                                hasCompleted = true;                            }                            else if (observer.Error != null)                            {                                error = observer.Error;                            }                            else                            {                                observer.TaskCompletionSource = tcs;                            }                        }                        if (hasValue)                        {                            tcs.TrySetResult(true);                        }                        else if (hasCompleted)                        {                            tcs.TrySetResult(false);                        }                        else if (error != null)                        {                            tcs.TrySetException(error);                        }                        return tcs.Task;                    },                    () => observer.Current,                    () =>                    {                        subscription.Dispose();                        // Should we cancel in-flight operations somehow?                    });            });        }        class ToAsyncEnumerableObserver<T> : IObserver<T>        {            public ToAsyncEnumerableObserver()            {                Values = new Queue<T>();            }            public object SyncRoot            {                get { return Values; }            }            public readonly Queue<T> Values;            public Exception Error;            public bool HasCompleted;            public T Current;            public TaskCompletionSource<bool> TaskCompletionSource;            public void OnCompleted()            {                var tcs = default(TaskCompletionSource<bool>);                lock (SyncRoot)                {                    HasCompleted = true;                    if (TaskCompletionSource != null)                    {                        tcs = TaskCompletionSource;                        TaskCompletionSource = null;                    }                }                if (tcs != null)                {                    tcs.SetResult(false);                }            }            public void OnError(Exception error)            {                var tcs = default(TaskCompletionSource<bool>);                lock (SyncRoot)                {                    Error = error;                    if (TaskCompletionSource != null)                    {                        tcs = TaskCompletionSource;                        TaskCompletionSource = null;                    }                }                if (tcs != null)                {                    tcs.SetException(error);                }            }            public void OnNext(T value)            {                var tcs = default(TaskCompletionSource<bool>);                lock (SyncRoot)                {                    if (TaskCompletionSource == null)                    {                        Values.Enqueue(value);                    }                    else                    {                        Current = value;                        tcs = TaskCompletionSource;                        TaskCompletionSource = null;                    }                }                if (tcs != null)                {                    tcs.SetResult(true);                }            }        }        public static IObservable<TSource> ToObservable<TSource>(this IAsyncEnumerable<TSource> source)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            return new ToObservableObservable<TSource>(source);        }        class ToObservableObservable<T> : IObservable<T>        {            private readonly IAsyncEnumerable<T> source;            public ToObservableObservable(IAsyncEnumerable<T> source)            {                this.source = source;            }            public IDisposable Subscribe(IObserver<T> observer)            {                var ctd = new CancellationTokenDisposable();                var e = source.GetEnumerator();                var f = default(Action);                f = () => e.MoveNext(ctd.Token).ContinueWith(t =>                {                    if (t.IsFaulted)                    {                        observer.OnError(t.Exception);                        e.Dispose();                    }                    else if (t.IsCanceled)                    {                        e.Dispose();                    }                    else if (t.IsCompleted)                    {                        if (t.Result)                        {                            observer.OnNext(e.Current);                            f();                        }                        else                        {                            observer.OnCompleted();                            e.Dispose();                        }                    }                }, ctd.Token);                f();                return Disposable.Create(ctd, e);            }        }    }}
 |