| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Reactive.Threading.Tasks;
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Reactive.Linq
- {
- using ObservableImpl;
- internal partial class QueryLanguage
- {
- #region - Create -
- public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
- {
- return new CreateWithDisposableObservable<TSource>(subscribe);
- }
- private sealed class CreateWithDisposableObservable<TSource> : ObservableBase<TSource>
- {
- private readonly Func<IObserver<TSource>, IDisposable> _subscribe;
- public CreateWithDisposableObservable(Func<IObserver<TSource>, IDisposable> subscribe)
- {
- _subscribe = subscribe;
- }
- protected override IDisposable SubscribeCore(IObserver<TSource> observer)
- {
- return _subscribe(observer) ?? Disposable.Empty;
- }
- }
- public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe)
- {
- return new CreateWithActionDisposable<TSource>(subscribe);
- }
- private sealed class CreateWithActionDisposable<TSource> : ObservableBase<TSource>
- {
- private readonly Func<IObserver<TSource>, Action> _subscribe;
- public CreateWithActionDisposable(Func<IObserver<TSource>, Action> subscribe)
- {
- _subscribe = subscribe;
- }
- protected override IDisposable SubscribeCore(IObserver<TSource> observer)
- {
- var a = _subscribe(observer);
- return a != null ? Disposable.Create(a) : Disposable.Empty;
- }
- }
- #endregion
- #region - CreateAsync -
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
- {
- return new CreateWithTaskTokenObservable<TResult>(subscribeAsync);
- }
- private sealed class CreateWithTaskTokenObservable<TResult> : ObservableBase<TResult>
- {
- private readonly Func<IObserver<TResult>, CancellationToken, Task> _subscribeAsync;
- public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
- {
- _subscribeAsync = subscribeAsync;
- }
- protected override IDisposable SubscribeCore(IObserver<TResult> observer)
- {
- var cancellable = new CancellationDisposable();
- var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
- var taskCompletionObserver = new TaskCompletionObserver(observer);
- var subscription = taskObservable.Subscribe(taskCompletionObserver);
- return StableCompositeDisposable.Create(cancellable, subscription);
- }
- private sealed class TaskCompletionObserver : IObserver<Unit>
- {
- private readonly IObserver<TResult> _observer;
- public TaskCompletionObserver(IObserver<TResult> observer)
- {
- _observer = observer;
- }
- public void OnCompleted()
- {
- _observer.OnCompleted();
- }
- public void OnError(Exception error)
- {
- _observer.OnError(error);
- }
- public void OnNext(Unit value)
- {
- // deliberately ignored
- }
- }
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
- {
- return Create<TResult>((observer, token) => subscribeAsync(observer));
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
- {
- return new CreateWithTaskDisposable<TResult>(subscribeAsync);
- }
- private sealed class CreateWithTaskDisposable<TResult> : ObservableBase<TResult>
- {
- private readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> _subscribeAsync;
- public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
- {
- _subscribeAsync = subscribeAsync;
- }
- protected override IDisposable SubscribeCore(IObserver<TResult> observer)
- {
- var cancellable = new CancellationDisposable();
- var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
- var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
- //
- // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
- // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
- //
- taskObservable.Subscribe(taskCompletionObserver);
- return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
- }
- private sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
- {
- private readonly IObserver<TResult> _observer;
- private IDisposable _disposable;
- public TaskDisposeCompletionObserver(IObserver<TResult> observer)
- {
- _observer = observer;
- }
- public void Dispose()
- {
- Disposable.TryDispose(ref _disposable);
- }
- public void OnCompleted()
- {
- _observer.OnCompleted();
- }
- public void OnError(Exception error)
- {
- _observer.OnError(error);
- }
- public void OnNext(IDisposable value)
- {
- Disposable.SetSingle(ref _disposable, value);
- }
- }
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync)
- {
- return Create<TResult>((observer, token) => subscribeAsync(observer));
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
- {
- return new CreateWithTaskActionObservable<TResult>(subscribeAsync);
- }
- private sealed class CreateWithTaskActionObservable<TResult> : ObservableBase<TResult>
- {
- private readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> _subscribeAsync;
- public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
- {
- _subscribeAsync = subscribeAsync;
- }
- protected override IDisposable SubscribeCore(IObserver<TResult> observer)
- {
- var cancellable = new CancellationDisposable();
- var taskObservable = _subscribeAsync(observer, cancellable.Token).ToObservable();
- var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
- //
- // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
- // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
- //
- taskObservable.Subscribe(taskCompletionObserver);
- return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
- }
- private sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
- {
- private readonly IObserver<TResult> _observer;
- private Action _disposable;
- private static readonly Action DisposedAction = () => { };
- public TaskDisposeCompletionObserver(IObserver<TResult> observer)
- {
- _observer = observer;
- }
- public void Dispose()
- {
- Interlocked.Exchange(ref _disposable, DisposedAction)?.Invoke();
- }
- public void OnCompleted()
- {
- _observer.OnCompleted();
- }
- public void OnError(Exception error)
- {
- _observer.OnError(error);
- }
- public void OnNext(Action value)
- {
- if (Interlocked.CompareExchange(ref _disposable, value, null) != null)
- {
- value?.Invoke();
- }
- }
- }
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync)
- {
- return Create<TResult>((observer, token) => subscribeAsync(observer));
- }
- #endregion
- #region + Defer +
- public virtual IObservable<TValue> Defer<TValue>(Func<IObservable<TValue>> observableFactory)
- {
- return new Defer<TValue>(observableFactory);
- }
- #endregion
- #region + DeferAsync +
- public virtual IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync)
- {
- return Defer(() => StartAsync(observableFactoryAsync).Merge());
- }
- public virtual IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync)
- {
- return Defer(() => StartAsync(observableFactoryAsync).Merge());
- }
- #endregion
- #region + Empty +
- public virtual IObservable<TResult> Empty<TResult>()
- {
- return EmptyDirect<TResult>.Instance;
- }
- public virtual IObservable<TResult> Empty<TResult>(IScheduler scheduler)
- {
- return new Empty<TResult>(scheduler);
- }
- #endregion
- #region + Generate +
- public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
- {
- return new Generate<TState, TResult>.NoTime(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
- }
- public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
- {
- return new Generate<TState, TResult>.NoTime(initialState, condition, iterate, resultSelector, scheduler);
- }
- #endregion
- #region + Never +
- public virtual IObservable<TResult> Never<TResult>()
- {
- return ObservableImpl.Never<TResult>.Default;
- }
- #endregion
- #region + Range +
- public virtual IObservable<int> Range(int start, int count)
- {
- return Range_(start, count, SchedulerDefaults.Iteration);
- }
- public virtual IObservable<int> Range(int start, int count, IScheduler scheduler)
- {
- return Range_(start, count, scheduler);
- }
- private static IObservable<int> Range_(int start, int count, IScheduler scheduler)
- {
- var longRunning = scheduler.AsLongRunning();
- if (longRunning != null)
- {
- return new RangeLongRunning(start, count, longRunning);
- }
- return new RangeRecursive(start, count, scheduler);
- }
- #endregion
- #region + Repeat +
- public virtual IObservable<TResult> Repeat<TResult>(TResult value)
- {
- return Repeat_(value, SchedulerDefaults.Iteration);
- }
- public virtual IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler)
- {
- return Repeat_(value, scheduler);
- }
- private IObservable<TResult> Repeat_<TResult>(TResult value, IScheduler scheduler)
- {
- var longRunning = scheduler.AsLongRunning();
- if (longRunning != null)
- {
- return new Repeat<TResult>.ForeverLongRunning(value, longRunning);
- }
- return new Repeat<TResult>.ForeverRecursive(value, scheduler);
- }
- public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount)
- {
- return Repeat_(value, repeatCount, SchedulerDefaults.Iteration);
- }
- public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler)
- {
- return Repeat_(value, repeatCount, scheduler);
- }
- private IObservable<TResult> Repeat_<TResult>(TResult value, int repeatCount, IScheduler scheduler)
- {
- var longRunning = scheduler.AsLongRunning();
- if (longRunning != null)
- {
- return new Repeat<TResult>.CountLongRunning(value, repeatCount, longRunning);
- }
- return new Repeat<TResult>.CountRecursive(value, repeatCount, scheduler);
- }
- #endregion
- #region + Return +
- public virtual IObservable<TResult> Return<TResult>(TResult value)
- {
- return new Return<TResult>(value, SchedulerDefaults.ConstantTimeOperations);
- }
- public virtual IObservable<TResult> Return<TResult>(TResult value, IScheduler scheduler)
- {
- return new Return<TResult>(value, scheduler);
- }
- #endregion
- #region + Throw +
- public virtual IObservable<TResult> Throw<TResult>(Exception exception)
- {
- return new Throw<TResult>(exception, SchedulerDefaults.ConstantTimeOperations);
- }
- public virtual IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler)
- {
- return new Throw<TResult>(exception, scheduler);
- }
- #endregion
- #region + Using +
- public virtual IObservable<TSource> Using<TSource, TResource>(Func<TResource> resourceFactory, Func<TResource, IObservable<TSource>> observableFactory) where TResource : IDisposable
- {
- return new Using<TSource, TResource>(resourceFactory, observableFactory);
- }
- #endregion
- #region - UsingAsync -
- public virtual IObservable<TSource> Using<TSource, TResource>(Func<CancellationToken, Task<TResource>> resourceFactoryAsync, Func<TResource, CancellationToken, Task<IObservable<TSource>>> observableFactoryAsync) where TResource : IDisposable
- {
- return Observable.FromAsync(resourceFactoryAsync)
- .SelectMany(resource =>
- Observable.Using(
- () => resource,
- resource_ => Observable.FromAsync(ct => observableFactoryAsync(resource_, ct)).Merge()
- )
- );
- }
- #endregion
- }
- }
|