|
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.Collections.Generic;
- using System.Linq;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Reactive.Subjects;
- namespace System.Reactive.Linq
- {
- #if !NO_PERF
- using ObservableImpl;
- #endif
- internal partial class QueryLanguage
- {
- #region + AsObservable +
- public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)
- {
- #if !NO_PERF
- var asObservable = source as AsObservable<TSource>;
- if (asObservable != null)
- return asObservable.Omega();
- return new AsObservable<TSource>(source);
- #else
- return new AnonymousObservable<TSource>(observer => source.Subscribe(observer));
- #endif
- }
- #endregion
- #region + Buffer +
- public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
- {
- return Buffer_<TSource>(source, count, count);
- }
- public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
- {
- return Buffer_<TSource>(source, count, skip);
- }
- private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
- {
- #if !NO_PERF
- return new Buffer<TSource>(source, count, skip);
- #else
- return Window_<TSource>(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0);
- #endif
- }
- #endregion
- #region + Dematerialize +
- public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source)
- {
- #if !NO_PERF
- var materialize = source as Materialize<TSource>;
- if (materialize != null)
- return materialize.Dematerialize();
- return new Dematerialize<TSource>(source);
- #else
- return new AnonymousObservable<TSource>(observer =>
- source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted));
- #endif
- }
- #endregion
- #region + DistinctUntilChanged +
- public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source)
- {
- return DistinctUntilChanged_(source, x => x, EqualityComparer<TSource>.Default);
- }
- public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)
- {
- return DistinctUntilChanged_(source, x => x, comparer);
- }
- public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
- {
- return DistinctUntilChanged_(source, keySelector, EqualityComparer<TKey>.Default);
- }
- public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
- {
- return DistinctUntilChanged_(source, keySelector, comparer);
- }
- private static IObservable<TSource> DistinctUntilChanged_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
- {
- #if !NO_PERF
- return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var currentKey = default(TKey);
- var hasCurrentKey = false;
- return source.Subscribe(
- value =>
- {
- var key = default(TKey);
- try
- {
- key = keySelector(value);
- }
- catch (Exception exception)
- {
- observer.OnError(exception);
- return;
- }
- var comparerEquals = false;
- if (hasCurrentKey)
- {
- try
- {
- comparerEquals = comparer.Equals(currentKey, key);
- }
- catch (Exception exception)
- {
- observer.OnError(exception);
- return;
- }
- }
- if (!hasCurrentKey || !comparerEquals)
- {
- hasCurrentKey = true;
- currentKey = key;
- observer.OnNext(value);
- }
- },
- observer.OnError,
- observer.OnCompleted);
- });
- #endif
- }
- #endregion
- #region + Do +
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext)
- {
- #if !NO_PERF
- return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, Stubs.Nop);
- #else
- // PERFORMANCE - Use of Select allows for operator coalescing
- return source.Select(
- x =>
- {
- onNext(x);
- return x;
- }
- );
- #endif
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)
- {
- #if !NO_PERF
- return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, onCompleted);
- #else
- return new AnonymousObservable<TSource>(obs =>
- {
- return source.Subscribe(
- x =>
- {
- try
- {
- onNext(x);
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnNext(x);
- },
- obs.OnError,
- () =>
- {
- try
- {
- onCompleted();
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnCompleted();
- });
- });
- #endif
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
- {
- #if !NO_PERF
- return Do_<TSource>(source, onNext, onError, Stubs.Nop);
- #else
- return new AnonymousObservable<TSource>(obs =>
- {
- return source.Subscribe(
- x =>
- {
- try
- {
- onNext(x);
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnNext(x);
- },
- ex =>
- {
- try
- {
- onError(ex);
- }
- catch (Exception ex2)
- {
- obs.OnError(ex2);
- }
- obs.OnError(ex);
- },
- obs.OnCompleted);
- });
- #endif
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
- {
- return Do_(source, onNext, onError, onCompleted);
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, IObserver<TSource> observer)
- {
- return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted);
- }
- private static IObservable<TSource> Do_<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
- {
- #if !NO_PERF
- return new Do<TSource>(source, onNext, onError, onCompleted);
- #else
- return new AnonymousObservable<TSource>(obs =>
- {
- return source.Subscribe(
- x =>
- {
- try
- {
- onNext(x);
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnNext(x);
- },
- ex =>
- {
- try
- {
- onError(ex);
- }
- catch (Exception ex2)
- {
- obs.OnError(ex2);
- }
- obs.OnError(ex);
- },
- () =>
- {
- try
- {
- onCompleted();
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnCompleted();
- });
- });
- #endif
- }
- #endregion
- #region + Finally +
- public virtual IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction)
- {
- #if !NO_PERF
- return new Finally<TSource>(source, finallyAction);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var subscription = source.Subscribe(observer);
- return Disposable.Create(() =>
- {
- try
- {
- subscription.Dispose();
- }
- finally
- {
- finallyAction();
- }
- });
- });
- #endif
- }
- #endregion
- #region + IgnoreElements +
- public virtual IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source)
- {
- #if !NO_PERF
- var ignoreElements = source as IgnoreElements<TSource>;
- if (ignoreElements != null)
- return ignoreElements.Omega();
- return new IgnoreElements<TSource>(source);
- #else
- return new AnonymousObservable<TSource>(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
- #endif
- }
- #endregion
- #region + Materialize +
- public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source)
- {
- #if !NO_PERF
- //
- // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to
- // contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case
- // a reduction to xs.AsObservable() would be performed, those notification elements would survive.
- //
- return new Materialize<TSource>(source);
- #else
- return new AnonymousObservable<Notification<TSource>>(observer =>
- source.Subscribe(
- value => observer.OnNext(Notification.CreateOnNext<TSource>(value)),
- exception =>
- {
- observer.OnNext(Notification.CreateOnError<TSource>(exception));
- observer.OnCompleted();
- },
- () =>
- {
- observer.OnNext(Notification.CreateOnCompleted<TSource>());
- observer.OnCompleted();
- }));
- #endif
- }
- #endregion
- #region - Repeat -
- public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)
- {
- return RepeatInfinite(source).Concat();
- }
- private static IEnumerable<T> RepeatInfinite<T>(T value)
- {
- while (true)
- yield return value;
- }
- public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount)
- {
- return Enumerable.Repeat(source, repeatCount).Concat();
- }
- #endregion
- #region - Retry -
- public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
- {
- return RepeatInfinite(source).Catch();
- }
- public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount)
- {
- return Enumerable.Repeat(source, retryCount).Catch();
- }
- #endregion
- #region + Scan +
- public virtual IObservable<TAccumulate> Scan<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
- {
- #if !NO_PERF
- return new Scan<TSource, TAccumulate>(source, seed, accumulator);
- #else
- return Defer(() =>
- {
- var accumulation = default(TAccumulate);
- var hasAccumulation = false;
- return source.Select(x =>
- {
- if (hasAccumulation)
- accumulation = accumulator(accumulation, x);
- else
- {
- accumulation = accumulator(seed, x);
- hasAccumulation = true;
- }
- return accumulation;
- });
- });
- #endif
- }
- public virtual IObservable<TSource> Scan<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
- {
- #if !NO_PERF
- return new Scan<TSource>(source, accumulator);
- #else
- return Defer(() =>
- {
- var accumulation = default(TSource);
- var hasAccumulation = false;
- return source.Select(x =>
- {
- if (hasAccumulation)
- accumulation = accumulator(accumulation, x);
- else
- {
- accumulation = x;
- hasAccumulation = true;
- }
- return accumulation;
- });
- });
- #endif
- }
- #endregion
- #region + SkipLast +
- public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, int count)
- {
- #if !NO_PERF
- return new SkipLast<TSource>(source, count);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var q = new Queue<TSource>();
- return source.Subscribe(
- x =>
- {
- q.Enqueue(x);
- if (q.Count > count)
- observer.OnNext(q.Dequeue());
- },
- observer.OnError,
- observer.OnCompleted);
- });
- #endif
- }
- #endregion
- #region - StartWith -
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, params TSource[] values)
- {
- return StartWith_<TSource>(source, SchedulerDefaults.ConstantTimeOperations, values);
- }
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
- {
- return StartWith_<TSource>(source, scheduler, values);
- }
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IEnumerable<TSource> values)
- {
- return StartWith(source, SchedulerDefaults.ConstantTimeOperations, values);
- }
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
- {
- //
- // NOTE: For some reason, someone introduced this signature in the Observable class, which is inconsistent with the Rx pattern
- // of putting the IScheduler last. It also wasn't wired up through IQueryLanguage. When introducing this method in the
- // IQueryLanguage interface, we went for consistency with the public API, hence the odd position of the IScheduler.
- //
- var valueArray = values as TSource[];
- if (valueArray == null)
- {
- var valueList = new List<TSource>(values);
- valueArray = valueList.ToArray();
- }
- return StartWith_<TSource>(source, scheduler, valueArray);
- }
- private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
- {
- return values.ToObservable(scheduler).Concat(source);
- }
- #endregion
- #region + TakeLast +
- public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count)
- {
- return TakeLast_(source, count, SchedulerDefaults.Iteration);
- }
- public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
- {
- return TakeLast_(source, count, scheduler);
- }
- private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
- {
- #if !NO_PERF
- return new TakeLast<TSource>(source, count, scheduler);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var q = new Queue<TSource>();
- var g = new CompositeDisposable();
- g.Add(source.Subscribe(
- x =>
- {
- q.Enqueue(x);
- if (q.Count > count)
- q.Dequeue();
- },
- observer.OnError,
- () =>
- {
- g.Add(scheduler.Schedule(rec =>
- {
- if (q.Count > 0)
- {
- observer.OnNext(q.Dequeue());
- rec();
- }
- else
- {
- observer.OnCompleted();
- }
- }));
- }
- ));
- return g;
- });
- #endif
- }
- public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, int count)
- {
- #if !NO_PERF
- return new TakeLastBuffer<TSource>(source, count);
- #else
- return new AnonymousObservable<IList<TSource>>(observer =>
- {
- var q = new Queue<TSource>();
- return source.Subscribe(
- x =>
- {
- q.Enqueue(x);
- if (q.Count > count)
- q.Dequeue();
- },
- observer.OnError,
- () =>
- {
- observer.OnNext(q.ToList());
- observer.OnCompleted();
- });
- });
- #endif
- }
- #endregion
- #region + Window +
- public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count, int skip)
- {
- return Window_<TSource>(source, count, skip);
- }
- public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count)
- {
- return Window_<TSource>(source, count, count);
- }
- private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip)
- {
- #if !NO_PERF
- return new Window<TSource>(source, count, skip);
- #else
- return new AnonymousObservable<IObservable<TSource>>(observer =>
- {
- var q = new Queue<ISubject<TSource>>();
- var n = 0;
- var m = new SingleAssignmentDisposable();
- var refCountDisposable = new RefCountDisposable(m);
- Action createWindow = () =>
- {
- var s = new Subject<TSource>();
- q.Enqueue(s);
- observer.OnNext(s.AddRef(refCountDisposable));
- };
- createWindow();
- m.Disposable = source.Subscribe(
- x =>
- {
- foreach (var s in q)
- s.OnNext(x);
- var c = n - count + 1;
- if (c >= 0 && c % skip == 0)
- {
- var s = q.Dequeue();
- s.OnCompleted();
- }
- n++;
- if (n % skip == 0)
- createWindow();
- },
- exception =>
- {
- while (q.Count > 0)
- q.Dequeue().OnError(exception);
- observer.OnError(exception);
- },
- () =>
- {
- while (q.Count > 0)
- q.Dequeue().OnCompleted();
- observer.OnCompleted();
- }
- );
- return refCountDisposable;
- });
- #endif
- }
- #endregion
- }
- }
|