123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.Collections.Generic;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Threading;
- using System.Linq;
- #if !NO_TPL
- using System.Reactive.Threading.Tasks;
- using System.Threading.Tasks;
- #endif
- namespace System.Reactive.Linq
- {
- #if !NO_PERF
- using ObservableImpl;
- #endif
- internal partial class QueryLanguage
- {
- #region - Create -
- public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
- {
- return new AnonymousObservable<TSource>(subscribe);
- }
- public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe)
- {
- return new AnonymousObservable<TSource>(o =>
- {
- var a = subscribe(o);
- return a != null ? Disposable.Create(a) : Disposable.Empty;
- });
- }
- #endregion
- #region - CreateAsync -
- #if !NO_TPL
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
- {
- return new AnonymousObservable<TResult>(observer =>
- {
- var cancellable = new CancellationDisposable();
- var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
- var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
- var subscription = taskObservable.Subscribe(taskCompletionObserver);
- return StableCompositeDisposable.Create(cancellable, subscription);
- });
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
- {
- return Create<TResult>((observer, token) => subscribeAsync(observer));
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
- {
- return new AnonymousObservable<TResult>(observer =>
- {
- var subscription = new SingleAssignmentDisposable();
- var cancellable = new CancellationDisposable();
- var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
- var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);
- //
- // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
- // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
- //
- taskObservable.Subscribe(taskCompletionObserver);
- return StableCompositeDisposable.Create(cancellable, subscription);
- });
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync)
- {
- return Create<TResult>((observer, token) => subscribeAsync(observer));
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
- {
- return new AnonymousObservable<TResult>(observer =>
- {
- var subscription = new SingleAssignmentDisposable();
- var cancellable = new CancellationDisposable();
- var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
- var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);
- //
- // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed.
- // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
- //
- taskObservable.Subscribe(taskCompletionObserver);
- return StableCompositeDisposable.Create(cancellable, subscription);
- });
- }
- public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync)
- {
- return Create<TResult>((observer, token) => subscribeAsync(observer));
- }
- #endif
- #endregion
- #region + Defer +
- public virtual IObservable<TValue> Defer<TValue>(Func<IObservable<TValue>> observableFactory)
- {
- #if !NO_PERF
- return new Defer<TValue>(observableFactory);
- #else
- return new AnonymousObservable<TValue>(observer =>
- {
- IObservable<TValue> result;
- try
- {
- result = observableFactory();
- }
- catch (Exception exception)
- {
- return Throw<TValue>(exception).Subscribe(observer);
- }
- return result.Subscribe(observer);
- });
- #endif
- }
- #endregion
- #region + DeferAsync +
- #if !NO_TPL
- public virtual IObservable<TValue> Defer<TValue>(Func<Task<IObservable<TValue>>> observableFactoryAsync)
- {
- return Defer(() => StartAsync(observableFactoryAsync).Merge());
- }
- public virtual IObservable<TValue> Defer<TValue>(Func<CancellationToken, Task<IObservable<TValue>>> observableFactoryAsync)
- {
- return Defer(() => StartAsync(observableFactoryAsync).Merge());
- }
- #endif
- #endregion
- #region + Empty +
- public virtual IObservable<TResult> Empty<TResult>()
- {
- #if !NO_PERF
- return new Empty<TResult>(SchedulerDefaults.ConstantTimeOperations);
- #else
- return Empty_<TResult>(SchedulerDefaults.ConstantTimeOperations);
- #endif
- }
- public virtual IObservable<TResult> Empty<TResult>(IScheduler scheduler)
- {
- #if !NO_PERF
- return new Empty<TResult>(scheduler);
- #else
- return Empty_<TResult>(scheduler);
- #endif
- }
- #if NO_PERF
- private static IObservable<TResult> Empty_<TResult>(IScheduler scheduler)
- {
- return new AnonymousObservable<TResult>(observer => scheduler.Schedule(observer.OnCompleted));
- }
- #endif
- #endregion
- #region + Generate +
- public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector)
- {
- #if !NO_PERF
- return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
- #else
- return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, SchedulerDefaults.Iteration);
- #endif
- }
- public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
- {
- #if !NO_PERF
- return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, scheduler);
- #else
- return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, scheduler);
- #endif
- }
- #if NO_PERF
- private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
- {
- return new AnonymousObservable<TResult>(observer =>
- {
- var state = initialState;
- var first = true;
- return scheduler.Schedule(self =>
- {
- var hasResult = false;
- var result = default(TResult);
- try
- {
- if (first)
- first = false;
- else
- state = iterate(state);
- hasResult = condition(state);
- if (hasResult)
- result = resultSelector(state);
- }
- catch (Exception exception)
- {
- observer.OnError(exception);
- return;
- }
- if (hasResult)
- {
- observer.OnNext(result);
- self();
- }
- else
- observer.OnCompleted();
- });
- });
- }
- #endif
- #endregion
- #region + Never +
- public virtual IObservable<TResult> Never<TResult>()
- {
- #if !NO_PERF
- return new Never<TResult>();
- #else
- return new AnonymousObservable<TResult>(observer => Disposable.Empty);
- #endif
- }
- #endregion
- #region + Range +
- public virtual IObservable<int> Range(int start, int count)
- {
- return Range_(start, count, SchedulerDefaults.Iteration);
- }
- public virtual IObservable<int> Range(int start, int count, IScheduler scheduler)
- {
- return Range_(start, count, scheduler);
- }
- private static IObservable<int> Range_(int start, int count, IScheduler scheduler)
- {
- #if !NO_PERF
- return new Range(start, count, scheduler);
- #else
- return new AnonymousObservable<int>(observer =>
- {
- return scheduler.Schedule(0, (i, self) =>
- {
- if (i < count)
- {
- observer.OnNext(start + i);
- self(i + 1);
- }
- else
- observer.OnCompleted();
- });
- });
- #endif
- }
- #endregion
- #region + Repeat +
- public virtual IObservable<TResult> Repeat<TResult>(TResult value)
- {
- #if !NO_PERF
- return new Repeat<TResult>(value, null, SchedulerDefaults.Iteration);
- #else
- return Repeat_(value, SchedulerDefaults.Iteration);
- #endif
- }
- public virtual IObservable<TResult> Repeat<TResult>(TResult value, IScheduler scheduler)
- {
- #if !NO_PERF
- return new Repeat<TResult>(value, null, scheduler);
- #else
- return Repeat_<TResult>(value, scheduler);
- #endif
- }
- #if NO_PERF
- private IObservable<TResult> Repeat_<TResult>(TResult value, IScheduler scheduler)
- {
- return Return(value, scheduler).Repeat();
- }
- #endif
- public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount)
- {
- #if !NO_PERF
- return new Repeat<TResult>(value, repeatCount, SchedulerDefaults.Iteration);
- #else
- return Repeat_(value, repeatCount, SchedulerDefaults.Iteration);
- #endif
- }
- public virtual IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, IScheduler scheduler)
- {
- #if !NO_PERF
- return new Repeat<TResult>(value, repeatCount, scheduler);
- #else
- return Repeat_(value, repeatCount, scheduler);
- #endif
- }
- #if NO_PERF
- private IObservable<TResult> Repeat_<TResult>(TResult value, int repeatCount, IScheduler scheduler)
- {
- return Return(value, scheduler).Repeat(repeatCount);
- }
- #endif
- #endregion
- #region + Return +
- public virtual IObservable<TResult> Return<TResult>(TResult value)
- {
- #if !NO_PERF
- return new Return<TResult>(value, SchedulerDefaults.ConstantTimeOperations);
- #else
- return Return_<TResult>(value, SchedulerDefaults.ConstantTimeOperations);
- #endif
- }
- public virtual IObservable<TResult> Return<TResult>(TResult value, IScheduler scheduler)
- {
- #if !NO_PERF
- return new Return<TResult>(value, scheduler);
- #else
- return Return_<TResult>(value, scheduler);
- #endif
- }
- #if NO_PERF
- private static IObservable<TResult> Return_<TResult>(TResult value, IScheduler scheduler)
- {
- return new AnonymousObservable<TResult>(observer =>
- scheduler.Schedule(() =>
- {
- observer.OnNext(value);
- observer.OnCompleted();
- })
- );
- }
- #endif
- #endregion
- #region + Throw +
- public virtual IObservable<TResult> Throw<TResult>(Exception exception)
- {
- #if !NO_PERF
- return new Throw<TResult>(exception, SchedulerDefaults.ConstantTimeOperations);
- #else
- return Throw_<TResult>(exception, SchedulerDefaults.ConstantTimeOperations);
- #endif
- }
- public virtual IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler)
- {
- #if !NO_PERF
- return new Throw<TResult>(exception, scheduler);
- #else
- return Throw_<TResult>(exception, scheduler);
- #endif
- }
- #if NO_PERF
- private static IObservable<TResult> Throw_<TResult>(Exception exception, IScheduler scheduler)
- {
- return new AnonymousObservable<TResult>(observer => scheduler.Schedule(() => observer.OnError(exception)));
- }
- #endif
- #endregion
- #region + Using +
- public virtual IObservable<TSource> Using<TSource, TResource>(Func<TResource> resourceFactory, Func<TResource, IObservable<TSource>> observableFactory) where TResource : IDisposable
- {
- #if !NO_PERF
- return new Using<TSource, TResource>(resourceFactory, observableFactory);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var source = default(IObservable<TSource>);
- var disposable = Disposable.Empty;
- try
- {
- var resource = resourceFactory();
- if (resource != null)
- disposable = resource;
- source = observableFactory(resource);
- }
- catch (Exception exception)
- {
- return new CompositeDisposable(Throw<TSource>(exception).Subscribe(observer), disposable);
- }
- return new CompositeDisposable(source.Subscribe(observer), disposable);
- });
- #endif
- }
- #endregion
- #region - UsingAsync -
- #if !NO_TPL
- public virtual IObservable<TSource> Using<TSource, TResource>(Func<CancellationToken, Task<TResource>> resourceFactoryAsync, Func<TResource, CancellationToken, Task<IObservable<TSource>>> observableFactoryAsync) where TResource : IDisposable
- {
- return Observable.FromAsync<TResource>(resourceFactoryAsync)
- .SelectMany(resource =>
- Observable.Using<TSource, TResource>(
- () => resource,
- resource_ => Observable.FromAsync<IObservable<TSource>>(ct => observableFactoryAsync(resource_, ct)).Merge()
- )
- );
- }
- #endif
- #endregion
- }
- }
|