// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Disposables; using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region - Create - public virtual IObservable Create(Func, IDisposable> subscribe) { return new AnonymousObservable(subscribe); } public virtual IObservable Create(Func, Action> subscribe) { return new AnonymousObservable(o => { var a = subscribe(o); return a != null ? Disposable.Create(a) : Disposable.Empty; }); } #endregion #region - CreateAsync - public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new AnonymousObservable(observer => { var cancellable = new CancellationDisposable(); var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new AnonymousObserver(Stubs.Ignore, observer.OnError, observer.OnCompleted); var subscription = taskObservable.Subscribe(taskCompletionObserver); return StableCompositeDisposable.Create(cancellable, subscription); }); } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new AnonymousObservable(observer => { var subscription = new SingleAssignmentDisposable(); var cancellable = new CancellationDisposable(); var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new AnonymousObserver(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop); // // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually. // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. // taskObservable.Subscribe(taskCompletionObserver); return StableCompositeDisposable.Create(cancellable, subscription); }); } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } public virtual IObservable Create(Func, CancellationToken, Task> subscribeAsync) { return new AnonymousObservable(observer => { var subscription = new SingleAssignmentDisposable(); var cancellable = new CancellationDisposable(); var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable(); var taskCompletionObserver = new AnonymousObserver(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop); // // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed. // Notice because we're using the AnonymousObservable type, we get auto-detach behavior for free. // taskObservable.Subscribe(taskCompletionObserver); return StableCompositeDisposable.Create(cancellable, subscription); }); } public virtual IObservable Create(Func, Task> subscribeAsync) { return Create((observer, token) => subscribeAsync(observer)); } #endregion #region + Defer + public virtual IObservable Defer(Func> observableFactory) { return new Defer(observableFactory); } #endregion #region + DeferAsync + public virtual IObservable Defer(Func>> observableFactoryAsync) { return Defer(() => StartAsync(observableFactoryAsync).Merge()); } public virtual IObservable Defer(Func>> observableFactoryAsync) { return Defer(() => StartAsync(observableFactoryAsync).Merge()); } #endregion #region + Empty + public virtual IObservable Empty() { return new Empty(SchedulerDefaults.ConstantTimeOperations); } public virtual IObservable Empty(IScheduler scheduler) { return new Empty(scheduler); } #endregion #region + Generate + public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector) { return new Generate.NoTime(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration); } public virtual IObservable Generate(TState initialState, Func condition, Func iterate, Func resultSelector, IScheduler scheduler) { return new Generate.NoTime(initialState, condition, iterate, resultSelector, scheduler); } #endregion #region + Never + public virtual IObservable Never() { return new Never(); } #endregion #region + Range + public virtual IObservable Range(int start, int count) { return Range_(start, count, SchedulerDefaults.Iteration); } public virtual IObservable Range(int start, int count, IScheduler scheduler) { return Range_(start, count, scheduler); } private static IObservable Range_(int start, int count, IScheduler scheduler) { return new Range(start, count, scheduler); } #endregion #region + Repeat + public virtual IObservable Repeat(TResult value) { return new Repeat.Forever(value, SchedulerDefaults.Iteration); } public virtual IObservable Repeat(TResult value, IScheduler scheduler) { return new Repeat.Forever(value, scheduler); } public virtual IObservable Repeat(TResult value, int repeatCount) { return new Repeat.Count(value, repeatCount, SchedulerDefaults.Iteration); } public virtual IObservable Repeat(TResult value, int repeatCount, IScheduler scheduler) { return new Repeat.Count(value, repeatCount, scheduler); } #endregion #region + Return + public virtual IObservable Return(TResult value) { return new Return(value, SchedulerDefaults.ConstantTimeOperations); } public virtual IObservable Return(TResult value, IScheduler scheduler) { return new Return(value, scheduler); } #endregion #region + Throw + public virtual IObservable Throw(Exception exception) { return new Throw(exception, SchedulerDefaults.ConstantTimeOperations); } public virtual IObservable Throw(Exception exception, IScheduler scheduler) { return new Throw(exception, scheduler); } #endregion #region + Using + public virtual IObservable Using(Func resourceFactory, Func> observableFactory) where TResource : IDisposable { return new Using(resourceFactory, observableFactory); } #endregion #region - UsingAsync - public virtual IObservable Using(Func> resourceFactoryAsync, Func>> observableFactoryAsync) where TResource : IDisposable { return Observable.FromAsync(resourceFactoryAsync) .SelectMany(resource => Observable.Using( () => resource, resource_ => Observable.FromAsync>(ct => observableFactoryAsync(resource_, ct)).Merge() ) ); } #endregion } }