| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System;using System.Collections.Generic;using System.Linq;using System.Threading.Tasks;namespace System.Linq{    public static partial class AsyncEnumerable    {        public static IAsyncEnumerable<TSource> ToAsyncEnumerable<TSource>(this IObservable<TSource> source)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            return CreateEnumerable(                () =>                {                    var observer = new ToAsyncEnumerableObserver<TSource>();                    var subscription = source.Subscribe(observer);                    return CreateEnumerator(                        tcs =>                        {                            var hasValue = false;                            var hasCompleted = false;                            var error = default(Exception);                            lock (observer.SyncRoot)                            {                                if (observer.Values.Count > 0)                                {                                    hasValue = true;                                    observer.Current = observer.Values.Dequeue();                                }                                else if (observer.HasCompleted)                                {                                    hasCompleted = true;                                }                                else if (observer.Error != null)                                {                                    error = observer.Error;                                }                                else                                {                                    observer.TaskCompletionSource = tcs;                                }                            }                            if (hasValue)                            {                                tcs.TrySetResult(true);                            }                            else if (hasCompleted)                            {                                tcs.TrySetResult(false);                            }                            else if (error != null)                            {                                tcs.TrySetException(error);                            }                            return tcs.Task;                        },                        () => observer.Current,                        () =>                        {                            subscription.Dispose();                            // Should we cancel in-flight operations somehow?                            return TaskExt.True;                        });                });        }        public static IObservable<TSource> ToObservable<TSource>(this IAsyncEnumerable<TSource> source)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            return new ToObservableObservable<TSource>(source);        }        private class ToAsyncEnumerableObserver<T> : IObserver<T>        {            public readonly Queue<T> Values;            public T Current;            public Exception Error;            public bool HasCompleted;            public TaskCompletionSource<bool> TaskCompletionSource;            public ToAsyncEnumerableObserver()            {                Values = new Queue<T>();            }            public object SyncRoot            {                get { return Values; }            }            public void OnCompleted()            {                var tcs = default(TaskCompletionSource<bool>);                lock (SyncRoot)                {                    HasCompleted = true;                    if (TaskCompletionSource != null)                    {                        tcs = TaskCompletionSource;                        TaskCompletionSource = null;                    }                }                if (tcs != null)                {                    tcs.TrySetResult(false);                }            }            public void OnError(Exception error)            {                var tcs = default(TaskCompletionSource<bool>);                lock (SyncRoot)                {                    Error = error;                    if (TaskCompletionSource != null)                    {                        tcs = TaskCompletionSource;                        TaskCompletionSource = null;                    }                }                if (tcs != null)                {                    tcs.TrySetException(error);                }            }            public void OnNext(T value)            {                var tcs = default(TaskCompletionSource<bool>);                lock (SyncRoot)                {                    if (TaskCompletionSource == null)                    {                        Values.Enqueue(value);                    }                    else                    {                        Current = value;                        tcs = TaskCompletionSource;                        TaskCompletionSource = null;                    }                }                if (tcs != null)                {                    tcs.TrySetResult(true);                }            }        }        private class ToObservableObservable<T> : IObservable<T>        {            private readonly IAsyncEnumerable<T> source;            public ToObservableObservable(IAsyncEnumerable<T> source)            {                this.source = source;            }            public IDisposable Subscribe(IObserver<T> observer)            {                var ctd = new CancellationTokenDisposable();                var e = source.GetAsyncEnumerator();                var f = default(Action);                f = () => e.MoveNextAsync()                           .ContinueWith(async t =>                                         {                                             if (t.IsFaulted)                                             {                                                 observer.OnError(t.Exception);                                                 await e.DisposeAsync().ConfigureAwait(false);                                             }                                             else if (t.IsCanceled)                                             {                                                 await e.DisposeAsync().ConfigureAwait(false);                                             }                                             else if (t.IsCompleted)                                             {                                                 if (t.Result)                                                 {                                                     observer.OnNext(e.Current);                                                     if (!ctd.Token.IsCancellationRequested)                                                         f();                                                                                                          //In case cancellation is requested, this could only have happened                                                     //by disposing the returned composite disposable (see below).                                                     //In that case, e will be disposed too, so there is no need to dispose e here.                                                 }                                                 else                                                 {                                                     observer.OnCompleted();                                                     await e.DisposeAsync().ConfigureAwait(false);                                                 }                                             }                                         }, ctd.Token);                f();                return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); /* REVIEW: fire-and-forget? */ }));            }        }    }}
 |