123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.Collections.Generic;
- using System.Linq;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Reactive.Subjects;
- namespace System.Reactive.Linq
- {
- #if !NO_PERF
- using ObservableImpl;
- #endif
- internal partial class QueryLanguage
- {
- #region + AsObservable +
- public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)
- {
- #if !NO_PERF
- var asObservable = source as AsObservable<TSource>;
- if (asObservable != null)
- return asObservable.Omega();
- return new AsObservable<TSource>(source);
- #else
- return new AnonymousObservable<TSource>(observer => source.Subscribe(observer));
- #endif
- }
- #endregion
- #region + Buffer +
- public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
- {
- return Buffer_<TSource>(source, count, count);
- }
- public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
- {
- return Buffer_<TSource>(source, count, skip);
- }
- private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
- {
- #if !NO_PERF
- return new Buffer<TSource>(source, count, skip);
- #else
- return Window_<TSource>(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0);
- #endif
- }
- #endregion
- #region + Dematerialize +
- public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source)
- {
- #if !NO_PERF
- var materialize = source as Materialize<TSource>;
- if (materialize != null)
- return materialize.Dematerialize();
- return new Dematerialize<TSource>(source);
- #else
- return new AnonymousObservable<TSource>(observer =>
- source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted));
- #endif
- }
- #endregion
- #region + DistinctUntilChanged +
- public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source)
- {
- return DistinctUntilChanged_(source, x => x, EqualityComparer<TSource>.Default);
- }
- public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)
- {
- return DistinctUntilChanged_(source, x => x, comparer);
- }
- public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
- {
- return DistinctUntilChanged_(source, keySelector, EqualityComparer<TKey>.Default);
- }
- public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
- {
- return DistinctUntilChanged_(source, keySelector, comparer);
- }
- private static IObservable<TSource> DistinctUntilChanged_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
- {
- #if !NO_PERF
- return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var currentKey = default(TKey);
- var hasCurrentKey = false;
- return source.Subscribe(
- value =>
- {
- var key = default(TKey);
- try
- {
- key = keySelector(value);
- }
- catch (Exception exception)
- {
- observer.OnError(exception);
- return;
- }
- var comparerEquals = false;
- if (hasCurrentKey)
- {
- try
- {
- comparerEquals = comparer.Equals(currentKey, key);
- }
- catch (Exception exception)
- {
- observer.OnError(exception);
- return;
- }
- }
- if (!hasCurrentKey || !comparerEquals)
- {
- hasCurrentKey = true;
- currentKey = key;
- observer.OnNext(value);
- }
- },
- observer.OnError,
- observer.OnCompleted);
- });
- #endif
- }
- #endregion
- #region + Do +
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext)
- {
- #if !NO_PERF
- return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, Stubs.Nop);
- #else
- // PERFORMANCE - Use of Select allows for operator coalescing
- return source.Select(
- x =>
- {
- onNext(x);
- return x;
- }
- );
- #endif
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)
- {
- #if !NO_PERF
- return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, onCompleted);
- #else
- return new AnonymousObservable<TSource>(obs =>
- {
- return source.Subscribe(
- x =>
- {
- try
- {
- onNext(x);
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnNext(x);
- },
- obs.OnError,
- () =>
- {
- try
- {
- onCompleted();
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnCompleted();
- });
- });
- #endif
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)
- {
- #if !NO_PERF
- return Do_<TSource>(source, onNext, onError, Stubs.Nop);
- #else
- return new AnonymousObservable<TSource>(obs =>
- {
- return source.Subscribe(
- x =>
- {
- try
- {
- onNext(x);
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnNext(x);
- },
- ex =>
- {
- try
- {
- onError(ex);
- }
- catch (Exception ex2)
- {
- obs.OnError(ex2);
- }
- obs.OnError(ex);
- },
- obs.OnCompleted);
- });
- #endif
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
- {
- return Do_(source, onNext, onError, onCompleted);
- }
- public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, IObserver<TSource> observer)
- {
- return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted);
- }
- private static IObservable<TSource> Do_<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)
- {
- #if !NO_PERF
- return new Do<TSource>(source, onNext, onError, onCompleted);
- #else
- return new AnonymousObservable<TSource>(obs =>
- {
- return source.Subscribe(
- x =>
- {
- try
- {
- onNext(x);
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnNext(x);
- },
- ex =>
- {
- try
- {
- onError(ex);
- }
- catch (Exception ex2)
- {
- obs.OnError(ex2);
- }
- obs.OnError(ex);
- },
- () =>
- {
- try
- {
- onCompleted();
- }
- catch (Exception ex)
- {
- obs.OnError(ex);
- }
- obs.OnCompleted();
- });
- });
- #endif
- }
- #endregion
- #region + Finally +
- public virtual IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction)
- {
- #if !NO_PERF
- return new Finally<TSource>(source, finallyAction);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var subscription = source.Subscribe(observer);
- return Disposable.Create(() =>
- {
- try
- {
- subscription.Dispose();
- }
- finally
- {
- finallyAction();
- }
- });
- });
- #endif
- }
- #endregion
- #region + IgnoreElements +
- public virtual IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source)
- {
- #if !NO_PERF
- var ignoreElements = source as IgnoreElements<TSource>;
- if (ignoreElements != null)
- return ignoreElements.Omega();
- return new IgnoreElements<TSource>(source);
- #else
- return new AnonymousObservable<TSource>(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
- #endif
- }
- #endregion
- #region + Materialize +
- public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source)
- {
- #if !NO_PERF
- //
- // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to
- // contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case
- // a reduction to xs.AsObservable() would be performed, those notification elements would survive.
- //
- return new Materialize<TSource>(source);
- #else
- return new AnonymousObservable<Notification<TSource>>(observer =>
- source.Subscribe(
- value => observer.OnNext(Notification.CreateOnNext<TSource>(value)),
- exception =>
- {
- observer.OnNext(Notification.CreateOnError<TSource>(exception));
- observer.OnCompleted();
- },
- () =>
- {
- observer.OnNext(Notification.CreateOnCompleted<TSource>());
- observer.OnCompleted();
- }));
- #endif
- }
- #endregion
- #region - Repeat -
- public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)
- {
- return RepeatInfinite(source).Concat();
- }
- private static IEnumerable<T> RepeatInfinite<T>(T value)
- {
- while (true)
- yield return value;
- }
- public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount)
- {
- return Enumerable.Repeat(source, repeatCount).Concat();
- }
- #endregion
- #region - Retry -
- public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)
- {
- return RepeatInfinite(source).Catch();
- }
- public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount)
- {
- return Enumerable.Repeat(source, retryCount).Catch();
- }
- #endregion
- #region + Scan +
- public virtual IObservable<TAccumulate> Scan<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)
- {
- #if !NO_PERF
- return new Scan<TSource, TAccumulate>(source, seed, accumulator);
- #else
- return Defer(() =>
- {
- var accumulation = default(TAccumulate);
- var hasAccumulation = false;
- return source.Select(x =>
- {
- if (hasAccumulation)
- accumulation = accumulator(accumulation, x);
- else
- {
- accumulation = accumulator(seed, x);
- hasAccumulation = true;
- }
- return accumulation;
- });
- });
- #endif
- }
- public virtual IObservable<TSource> Scan<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)
- {
- #if !NO_PERF
- return new Scan<TSource>(source, accumulator);
- #else
- return Defer(() =>
- {
- var accumulation = default(TSource);
- var hasAccumulation = false;
- return source.Select(x =>
- {
- if (hasAccumulation)
- accumulation = accumulator(accumulation, x);
- else
- {
- accumulation = x;
- hasAccumulation = true;
- }
- return accumulation;
- });
- });
- #endif
- }
- #endregion
- #region + SkipLast +
- public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, int count)
- {
- #if !NO_PERF
- return new SkipLast<TSource>(source, count);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var q = new Queue<TSource>();
- return source.Subscribe(
- x =>
- {
- q.Enqueue(x);
- if (q.Count > count)
- observer.OnNext(q.Dequeue());
- },
- observer.OnError,
- observer.OnCompleted);
- });
- #endif
- }
- #endregion
- #region - StartWith -
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, params TSource[] values)
- {
- return StartWith_<TSource>(source, SchedulerDefaults.ConstantTimeOperations, values);
- }
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
- {
- return StartWith_<TSource>(source, scheduler, values);
- }
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IEnumerable<TSource> values)
- {
- return StartWith(source, SchedulerDefaults.ConstantTimeOperations, values);
- }
- public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)
- {
- //
- // NOTE: For some reason, someone introduced this signature in the Observable class, which is inconsistent with the Rx pattern
- // of putting the IScheduler last. It also wasn't wired up through IQueryLanguage. When introducing this method in the
- // IQueryLanguage interface, we went for consistency with the public API, hence the odd position of the IScheduler.
- //
- var valueArray = values as TSource[];
- if (valueArray == null)
- {
- var valueList = new List<TSource>(values);
- valueArray = valueList.ToArray();
- }
- return StartWith_<TSource>(source, scheduler, valueArray);
- }
- private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)
- {
- return values.ToObservable(scheduler).Concat(source);
- }
- #endregion
- #region + TakeLast +
- public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count)
- {
- return TakeLast_(source, count, SchedulerDefaults.Iteration);
- }
- public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
- {
- return TakeLast_(source, count, scheduler);
- }
- private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)
- {
- #if !NO_PERF
- return new TakeLast<TSource>(source, count, scheduler);
- #else
- return new AnonymousObservable<TSource>(observer =>
- {
- var q = new Queue<TSource>();
- var g = new CompositeDisposable();
- g.Add(source.Subscribe(
- x =>
- {
- q.Enqueue(x);
- if (q.Count > count)
- q.Dequeue();
- },
- observer.OnError,
- () =>
- {
- g.Add(scheduler.Schedule(rec =>
- {
- if (q.Count > 0)
- {
- observer.OnNext(q.Dequeue());
- rec();
- }
- else
- {
- observer.OnCompleted();
- }
- }));
- }
- ));
- return g;
- });
- #endif
- }
- public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, int count)
- {
- #if !NO_PERF
- return new TakeLastBuffer<TSource>(source, count);
- #else
- return new AnonymousObservable<IList<TSource>>(observer =>
- {
- var q = new Queue<TSource>();
- return source.Subscribe(
- x =>
- {
- q.Enqueue(x);
- if (q.Count > count)
- q.Dequeue();
- },
- observer.OnError,
- () =>
- {
- observer.OnNext(q.ToList());
- observer.OnCompleted();
- });
- });
- #endif
- }
- #endregion
- #region + Window +
- public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count, int skip)
- {
- return Window_<TSource>(source, count, skip);
- }
- public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count)
- {
- return Window_<TSource>(source, count, count);
- }
- private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip)
- {
- #if !NO_PERF
- return new Window<TSource>(source, count, skip);
- #else
- return new AnonymousObservable<IObservable<TSource>>(observer =>
- {
- var q = new Queue<ISubject<TSource>>();
- var n = 0;
- var m = new SingleAssignmentDisposable();
- var refCountDisposable = new RefCountDisposable(m);
- Action createWindow = () =>
- {
- var s = new Subject<TSource>();
- q.Enqueue(s);
- observer.OnNext(s.AddRef(refCountDisposable));
- };
- createWindow();
- m.Disposable = source.Subscribe(
- x =>
- {
- foreach (var s in q)
- s.OnNext(x);
- var c = n - count + 1;
- if (c >= 0 && c % skip == 0)
- {
- var s = q.Dequeue();
- s.OnCompleted();
- }
- n++;
- if (n % skip == 0)
- createWindow();
- },
- exception =>
- {
- while (q.Count > 0)
- q.Dequeue().OnError(exception);
- observer.OnError(exception);
- },
- () =>
- {
- while (q.Count > 0)
- q.Dequeue().OnCompleted();
- observer.OnCompleted();
- }
- );
- return refCountDisposable;
- });
- #endif
- }
- #endregion
- }
- }
|