| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Threading.Tasks;namespace System.Reactive.Linq{    partial class AsyncObservable    {        public static IAsyncObservable<TSource> ToAsyncObservable<TSource>(this IObservable<TSource> source)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            return ToAsyncObservable(source, TaskPoolAsyncScheduler.Default, TaskPoolAsyncScheduler.Default);        }        public static IAsyncObservable<TSource> ToAsyncObservable<TSource>(this IObservable<TSource> source, IAsyncScheduler scheduler)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (scheduler == null)                throw new ArgumentNullException(nameof(scheduler));            return ToAsyncObservable(source, scheduler, scheduler);        }        public static IAsyncObservable<TSource> ToAsyncObservable<TSource>(this IObservable<TSource> source, IAsyncScheduler subscribeScheduler, IAsyncScheduler disposeScheduler)        {            if (source == null)                throw new ArgumentNullException(nameof(source));            if (subscribeScheduler == null)                throw new ArgumentNullException(nameof(subscribeScheduler));            if (disposeScheduler == null)                throw new ArgumentNullException(nameof(disposeScheduler));            return Create<TSource>(async observer =>            {                var d = new CompositeAsyncDisposable();                var subscribeTask = await subscribeScheduler.ScheduleAsync(async ct =>                {                    ct.ThrowIfCancellationRequested();                    var disposable = source.Subscribe(AsyncObserver.ToObserver(observer));                    var disposeTask = AsyncDisposable.Create(() => disposeScheduler.ExecuteAsync(_ =>                    {                        disposable.Dispose();                        return Task.CompletedTask;                    }));                    await d.AddAsync(disposeTask).RendezVous(subscribeScheduler);                }).ConfigureAwait(false);                await d.AddAsync(subscribeTask).ConfigureAwait(false);                return d;            });        }    }    partial class AsyncObserver    {        // REVIEW: Add a way to parameterize blocking behavior (e.g. blocking, fire-and-forget, async chaining).        public static IObserver<TSource> ToObserver<TSource>(IAsyncObserver<TSource> observer)        {            if (observer == null)                throw new ArgumentNullException(nameof(observer));            return new AsyncToSyncObserver<TSource>(observer);        }        private sealed class AsyncToSyncObserver<T> : IObserver<T>        {            private readonly IAsyncObserver<T> _observer;            public AsyncToSyncObserver(IAsyncObserver<T> observer)            {                _observer = observer;            }            public void OnCompleted() => _observer.OnCompletedAsync().GetAwaiter().GetResult();            public void OnError(Exception error) => _observer.OnErrorAsync(error).GetAwaiter().GetResult();            public void OnNext(T value) => _observer.OnNextAsync(value).GetAwaiter().GetResult();        }    }}
 |