// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region - Create - public virtual IObservable Create(Func, IDisposable> subscribe) { return new CreateWithDisposableObservable(subscribe); } private sealed class CreateWithDisposableObservable : ObservableBase { private readonly Func, IDisposable> _subscribe; public CreateWithDisposableObservable(Func, IDisposable> subscribe) { _subscribe = subscribe; } protected override IDisposable SubscribeCore(IObserver observer) { return _subscribe(observer) ?? Disposable.Empty; } } public virtual IObservable Create(Func, Action> subscribe) { return new CreateWithActionDisposable(subscribe); } private sealed class CreateWithActionDisposable : ObservableBase { private readonly Func, Action> _subscribe; public CreateWithActionDisposable(Func, Action> subscribe) { _subscribe = subscribe; } protected override IDisposable SubscribeCore(IObserver observer) { var a = _subscribe(observer); return a != null ? Disposable.Create(a) : Disposable.Empty; } } #endregion #region - CreateAsync - public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new CreateWithTaskTokenObservable(subscribeAsync); } private sealed class CreateWithTaskTokenObservable : ObservableBase { private readonly Func, CancellationToken, Task> _subscribeAsync; public CreateWithTaskTokenObservable(Func, CancellationToken, Task> subscribeAsync) { _subscribeAsync = subscribeAsync; } protected override IDisposable SubscribeCore(IObserver observer) { var cancellable = new CancellationDisposable(); var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new TaskCompletionObserver(observer); var subscription = taskObservable.Subscribe(taskCompletionObserver); return StableCompositeDisposable.Create(cancellable, subscription); } private sealed class TaskCompletionObserver : IObserver { private readonly IObserver _observer; public TaskCompletionObserver(IObserver observer) { _observer = observer; } public void OnCompleted() { _observer.OnCompleted(); } public void OnError(Exception error) { _observer.OnError(error); } public void OnNext(Unit value) { // deliberately ignored } } } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new CreateWithTaskDisposable(subscribeAsync); } private sealed class CreateWithTaskDisposable : ObservableBase { private readonly Func, CancellationToken, Task> _subscribeAsync; public CreateWithTaskDisposable(Func, CancellationToken, Task> subscribeAsync) { _subscribeAsync = subscribeAsync; } protected override IDisposable SubscribeCore(IObserver observer) { var cancellable = new CancellationDisposable(); var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new TaskDisposeCompletionObserver(observer); // // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. // taskObservable.Subscribe(taskCompletionObserver); return StableCompositeDisposable.Create(cancellable, taskCompletionObserver); } private sealed class TaskDisposeCompletionObserver : IObserver, IDisposable { private readonly IObserver _observer; private IDisposable _disposable; public TaskDisposeCompletionObserver(IObserver observer) { _observer = observer; } public void Dispose() { Disposable.TryDispose(ref _disposable); } public void OnCompleted() { _observer.OnCompleted(); } public void OnError(Exception error) { _observer.OnError(error); } public void OnNext(IDisposable value) { Disposable.SetSingle(ref _disposable, value); } } } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new CreateWithTaskActionObservable(subscribeAsync); } private sealed class CreateWithTaskActionObservable : ObservableBase { private readonly Func, CancellationToken, Task> _subscribeAsync; public CreateWithTaskActionObservable(Func, CancellationToken, Task> subscribeAsync) { _subscribeAsync = subscribeAsync; } protected override IDisposable SubscribeCore(IObserver observer) { var cancellable = new CancellationDisposable(); var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new TaskDisposeCompletionObserver(observer); // // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. // taskObservable.Subscribe(taskCompletionObserver); return StableCompositeDisposable.Create(cancellable, taskCompletionObserver); } private sealed class TaskDisposeCompletionObserver : IObserver, IDisposable { private readonly IObserver _observer; private Action _disposable; private static readonly Action DisposedAction = () => { }; public TaskDisposeCompletionObserver(IObserver observer) { _observer = observer; } public void Dispose() { Interlocked.Exchange(ref _disposable, DisposedAction)?.Invoke(); } public void OnCompleted() { _observer.OnCompleted(); } public void OnError(Exception error) { _observer.OnError(error); } public void OnNext(Action value) { if (Interlocked.CompareExchange(ref _disposable, value, null) != null) { value?.Invoke(); } } } } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } #endregion #region + Defer + public virtual IObservable Defer(Func> observableFactory) { return new Defer(observableFactory); } #endregion #region + DeferAsync + public virtual IObservable Defer(Func>> observableFactoryAsync) { return Defer(() => StartAsync(observableFactoryAsync).Merge()); } public virtual IObservable Defer(Func>> observableFactoryAsync) { return Defer(() => StartAsync(observableFactoryAsync).Merge()); } #endregion #region + Empty + public virtual IObservable Empty() { return EmptyDirect.Instance; } public virtual IObservable Empty(IScheduler scheduler) { return new Empty(scheduler); } #endregion #region + Generate + public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector) { return new Generate.NoTime(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration); } public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, IScheduler scheduler) { return new Generate.NoTime(initialState, condition, iterate, resultSelector, scheduler); } #endregion #region + Never + public virtual IObservable Never() { return ObservableImpl.Never.Default; } #endregion #region + Range + public virtual IObservable Range(int start, int count) { return Range_(start, count, SchedulerDefaults.Iteration); } public virtual IObservable Range(int start, int count, IScheduler scheduler) { return Range_(start, count, scheduler); } private static IObservable Range_(int start, int count, IScheduler scheduler) { var longRunning = scheduler.AsLongRunning(); if (longRunning != null) { return new RangeLongRunning(start, count, longRunning); } return new RangeRecursive(start, count, scheduler); } #endregion #region + Repeat + public virtual IObservable Repeat(TResult value) { return Repeat_(value, SchedulerDefaults.Iteration); } public virtual IObservable Repeat(TResult value, IScheduler scheduler) { return Repeat_(value, scheduler); } private IObservable Repeat_(TResult value, IScheduler scheduler) { var longRunning = scheduler.AsLongRunning(); if (longRunning != null) { return new Repeat.ForeverLongRunning(value, longRunning); } return new Repeat.ForeverRecursive(value, scheduler); } public virtual IObservable Repeat(TResult value, int repeatCount) { return Repeat_(value, repeatCount, SchedulerDefaults.Iteration); } public virtual IObservable Repeat(TResult value, int repeatCount, IScheduler scheduler) { return Repeat_(value, repeatCount, scheduler); } private IObservable Repeat_(TResult value, int repeatCount, IScheduler scheduler) { var longRunning = scheduler.AsLongRunning(); if (longRunning != null) { return new Repeat.CountLongRunning(value, repeatCount, longRunning); } return new Repeat.CountRecursive(value, repeatCount, scheduler); } #endregion #region + Return + public virtual IObservable Return(TResult value) { return new Return(value, SchedulerDefaults.ConstantTimeOperations); } public virtual IObservable Return(TResult value, IScheduler scheduler) { return new Return(value, scheduler); } #endregion #region + Throw + public virtual IObservable Throw(Exception exception) { return new Throw(exception, SchedulerDefaults.ConstantTimeOperations); } public virtual IObservable Throw(Exception exception, IScheduler scheduler) { return new Throw(exception, scheduler); } #endregion #region + Using + public virtual IObservable Using(Func resourceFactory, Func> observableFactory) where TResource : IDisposable { return new Using(resourceFactory, observableFactory); } #endregion #region - UsingAsync - public virtual IObservable Using(Func> resourceFactoryAsync, Func>> observableFactoryAsync) where TResource : IDisposable { return Observable.FromAsync(resourceFactoryAsync) .SelectMany(resource => Observable.Using( () => resource, resource_ => Observable.FromAsync(ct => observableFactoryAsync(resource_, ct)).Merge() ) ); } #endregion } }