| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Collections.Generic;using System.Linq;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Reactive.Subjects;namespace System.Reactive.Linq{#if !NO_PERF    using ObservableImpl;#endif    internal partial class QueryLanguage    {        #region + AsObservable +        public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)        {#if !NO_PERF            var asObservable = source as AsObservable<TSource>;            if (asObservable != null)                return asObservable.Omega();            return new AsObservable<TSource>(source);#else            return new AnonymousObservable<TSource>(observer => source.Subscribe(observer));#endif        }        #endregion        #region + Buffer +        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)        {            return Buffer_<TSource>(source, count, count);        }        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)        {            return Buffer_<TSource>(source, count, skip);        }        private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)        {#if !NO_PERF            return new Buffer<TSource>(source, count, skip);#else            return Window_<TSource>(source, count, skip).SelectMany(Observable.ToList).Where(list => list.Count > 0);#endif        }        #endregion        #region + Dematerialize +        public virtual IObservable<TSource> Dematerialize<TSource>(IObservable<Notification<TSource>> source)        {#if !NO_PERF            var materialize = source as Materialize<TSource>;            if (materialize != null)                return materialize.Dematerialize();            return new Dematerialize<TSource>(source);#else            return new AnonymousObservable<TSource>(observer =>                source.Subscribe(x => x.Accept(observer), observer.OnError, observer.OnCompleted));#endif        }        #endregion        #region + DistinctUntilChanged +        public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source)        {            return DistinctUntilChanged_(source, x => x, EqualityComparer<TSource>.Default);        }        public virtual IObservable<TSource> DistinctUntilChanged<TSource>(IObservable<TSource> source, IEqualityComparer<TSource> comparer)        {            return DistinctUntilChanged_(source, x => x, comparer);        }        public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)        {            return DistinctUntilChanged_(source, keySelector, EqualityComparer<TKey>.Default);        }        public virtual IObservable<TSource> DistinctUntilChanged<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)        {            return DistinctUntilChanged_(source, keySelector, comparer);        }        private static IObservable<TSource> DistinctUntilChanged_<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)        {#if !NO_PERF            return new DistinctUntilChanged<TSource, TKey>(source, keySelector, comparer);#else            return new AnonymousObservable<TSource>(observer =>            {                var currentKey = default(TKey);                var hasCurrentKey = false;                return source.Subscribe(                    value =>                    {                        var key = default(TKey);                        try                        {                            key = keySelector(value);                        }                        catch (Exception exception)                        {                            observer.OnError(exception);                            return;                        }                        var comparerEquals = false;                        if (hasCurrentKey)                        {                            try                            {                                comparerEquals = comparer.Equals(currentKey, key);                            }                            catch (Exception exception)                            {                                observer.OnError(exception);                                return;                            }                        }                        if (!hasCurrentKey || !comparerEquals)                        {                            hasCurrentKey = true;                            currentKey = key;                            observer.OnNext(value);                        }                    },                    observer.OnError,                    observer.OnCompleted);            });#endif        }        #endregion        #region + Do +        public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext)        {#if !NO_PERF            return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, Stubs.Nop);#else            // PERFORMANCE - Use of Select allows for operator coalescing            return source.Select(                x =>                {                    onNext(x);                    return x;                }            );#endif        }        public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action onCompleted)        {#if !NO_PERF            return Do_<TSource>(source, onNext, Stubs<Exception>.Ignore, onCompleted);#else            return new AnonymousObservable<TSource>(obs =>            {                return source.Subscribe(                    x =>                    {                        try                        {                            onNext(x);                        }                        catch (Exception ex)                        {                            obs.OnError(ex);                        }                        obs.OnNext(x);                    },                    obs.OnError,                    () =>                    {                        try                        {                            onCompleted();                        }                        catch (Exception ex)                        {                            obs.OnError(ex);                        }                        obs.OnCompleted();                    });            });#endif        }        public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError)        {#if !NO_PERF            return Do_<TSource>(source, onNext, onError, Stubs.Nop);#else            return new AnonymousObservable<TSource>(obs =>            {                return source.Subscribe(                    x =>                    {                        try                        {                            onNext(x);                        }                        catch (Exception ex)                        {                            obs.OnError(ex);                        }                        obs.OnNext(x);                    },                    ex =>                    {                        try                        {                            onError(ex);                        }                        catch (Exception ex2)                        {                            obs.OnError(ex2);                        }                        obs.OnError(ex);                    },                    obs.OnCompleted);            });#endif        }        public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)        {            return Do_(source, onNext, onError, onCompleted);        }        public virtual IObservable<TSource> Do<TSource>(IObservable<TSource> source, IObserver<TSource> observer)        {            return Do_(source, observer.OnNext, observer.OnError, observer.OnCompleted);        }        private static IObservable<TSource> Do_<TSource>(IObservable<TSource> source, Action<TSource> onNext, Action<Exception> onError, Action onCompleted)        {#if !NO_PERF            return new Do<TSource>(source, onNext, onError, onCompleted);#else            return new AnonymousObservable<TSource>(obs =>            {                return source.Subscribe(                    x =>                    {                        try                        {                            onNext(x);                        }                        catch (Exception ex)                        {                            obs.OnError(ex);                        }                        obs.OnNext(x);                    },                    ex =>                    {                        try                        {                            onError(ex);                        }                        catch (Exception ex2)                        {                            obs.OnError(ex2);                        }                        obs.OnError(ex);                    },                    () =>                    {                        try                        {                            onCompleted();                        }                        catch (Exception ex)                        {                            obs.OnError(ex);                        }                        obs.OnCompleted();                    });            });#endif        }        #endregion        #region + Finally +        public virtual IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction)        {#if !NO_PERF            return new Finally<TSource>(source, finallyAction);#else            return new AnonymousObservable<TSource>(observer =>            {                var subscription = source.Subscribe(observer);                return Disposable.Create(() =>                    {                        try                        {                            subscription.Dispose();                        }                        finally                        {                            finallyAction();                        }                    });            });#endif        }        #endregion        #region + IgnoreElements +        public virtual IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source)        {#if !NO_PERF            var ignoreElements = source as IgnoreElements<TSource>;            if (ignoreElements != null)                return ignoreElements.Omega();            return new IgnoreElements<TSource>(source);#else            return new AnonymousObservable<TSource>(observer => source.Subscribe(_ => { }, observer.OnError, observer.OnCompleted));#endif        }        #endregion        #region + Materialize +        public virtual IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source)        {#if !NO_PERF            //            // NOTE: Peephole optimization of xs.Dematerialize().Materialize() should not be performed. It's possible for xs to            //       contain multiple terminal notifications, which won't survive a Dematerialize().Materialize() chain. In case            //       a reduction to xs.AsObservable() would be performed, those notification elements would survive.            //            return new Materialize<TSource>(source);#else            return new AnonymousObservable<Notification<TSource>>(observer =>                source.Subscribe(                    value => observer.OnNext(Notification.CreateOnNext<TSource>(value)),                    exception =>                    {                        observer.OnNext(Notification.CreateOnError<TSource>(exception));                        observer.OnCompleted();                    },                    () =>                    {                        observer.OnNext(Notification.CreateOnCompleted<TSource>());                        observer.OnCompleted();                    }));#endif        }        #endregion        #region - Repeat -        public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)        {            return RepeatInfinite(source).Concat();        }        private static IEnumerable<T> RepeatInfinite<T>(T value)        {            while (true)                yield return value;        }        public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount)        {            return Enumerable.Repeat(source, repeatCount).Concat();        }        #endregion        #region - Retry -        public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source)        {            return RepeatInfinite(source).Catch();        }        public virtual IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount)        {            return Enumerable.Repeat(source, retryCount).Catch();        }        #endregion        #region + Scan +        public virtual IObservable<TAccumulate> Scan<TSource, TAccumulate>(IObservable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator)        {#if !NO_PERF            return new Scan<TSource, TAccumulate>(source, seed, accumulator);#else            return Defer(() =>            {                var accumulation = default(TAccumulate);                var hasAccumulation = false;                return source.Select(x =>                {                    if (hasAccumulation)                        accumulation = accumulator(accumulation, x);                    else                    {                        accumulation = accumulator(seed, x);                        hasAccumulation = true;                    }                    return accumulation;                });            });#endif        }        public virtual IObservable<TSource> Scan<TSource>(IObservable<TSource> source, Func<TSource, TSource, TSource> accumulator)        {#if !NO_PERF            return new Scan<TSource>(source, accumulator);#else            return Defer(() =>            {                var accumulation = default(TSource);                var hasAccumulation = false;                return source.Select(x =>                {                    if (hasAccumulation)                        accumulation = accumulator(accumulation, x);                    else                    {                        accumulation = x;                        hasAccumulation = true;                    }                    return accumulation;                });            });#endif        }        #endregion        #region + SkipLast +        public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, int count)        {#if !NO_PERF            return new SkipLast<TSource>(source, count);#else            return new AnonymousObservable<TSource>(observer =>            {                var q = new Queue<TSource>();                return source.Subscribe(                    x =>                    {                        q.Enqueue(x);                        if (q.Count > count)                            observer.OnNext(q.Dequeue());                    },                    observer.OnError,                    observer.OnCompleted);            });#endif        }        #endregion        #region - StartWith -        public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, params TSource[] values)        {            return StartWith_<TSource>(source, SchedulerDefaults.ConstantTimeOperations, values);        }        public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)        {            return StartWith_<TSource>(source, scheduler, values);        }        public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IEnumerable<TSource> values)        {            return StartWith(source, SchedulerDefaults.ConstantTimeOperations, values);        }        public virtual IObservable<TSource> StartWith<TSource>(IObservable<TSource> source, IScheduler scheduler, IEnumerable<TSource> values)        {            //            // NOTE: For some reason, someone introduced this signature in the Observable class, which is inconsistent with the Rx pattern            //       of putting the IScheduler last. It also wasn't wired up through IQueryLanguage. When introducing this method in the            //       IQueryLanguage interface, we went for consistency with the public API, hence the odd position of the IScheduler.            //            var valueArray = values as TSource[];            if (valueArray == null)            {                var valueList = new List<TSource>(values);                valueArray = valueList.ToArray();            }            return StartWith_<TSource>(source, scheduler, valueArray);        }        private static IObservable<TSource> StartWith_<TSource>(IObservable<TSource> source, IScheduler scheduler, params TSource[] values)        {            return values.ToObservable(scheduler).Concat(source);        }        #endregion        #region + TakeLast +        public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count)        {            return TakeLast_(source, count, SchedulerDefaults.Iteration);        }        public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)        {            return TakeLast_(source, count, scheduler);        }        private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, int count, IScheduler scheduler)        {#if !NO_PERF            return new TakeLast<TSource>(source, count, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var q = new Queue<TSource>();                var g = new CompositeDisposable();                g.Add(source.Subscribe(                    x =>                    {                        q.Enqueue(x);                        if (q.Count > count)                            q.Dequeue();                    },                    observer.OnError,                    () =>                    {                        g.Add(scheduler.Schedule(rec =>                        {                            if (q.Count > 0)                            {                                observer.OnNext(q.Dequeue());                                rec();                            }                            else                            {                                observer.OnCompleted();                            }                        }));                    }                ));                return g;            });#endif        }        public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, int count)        {#if !NO_PERF            return new TakeLastBuffer<TSource>(source, count);#else            return new AnonymousObservable<IList<TSource>>(observer =>            {                var q = new Queue<TSource>();                return source.Subscribe(                    x =>                    {                        q.Enqueue(x);                        if (q.Count > count)                            q.Dequeue();                    },                    observer.OnError,                    () =>                    {                        observer.OnNext(q.ToList());                        observer.OnCompleted();                    });            });#endif        }        #endregion        #region + Window +        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count, int skip)        {            return Window_<TSource>(source, count, skip);        }        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, int count)        {            return Window_<TSource>(source, count, count);        }        private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, int count, int skip)        {#if !NO_PERF            return new Window<TSource>(source, count, skip);#else            return new AnonymousObservable<IObservable<TSource>>(observer =>            {                var q = new Queue<ISubject<TSource>>();                var n = 0;                var m = new SingleAssignmentDisposable();                var refCountDisposable = new RefCountDisposable(m);                Action createWindow = () =>                {                    var s = new Subject<TSource>();                    q.Enqueue(s);                    observer.OnNext(s.AddRef(refCountDisposable));                };                createWindow();                m.Disposable = source.Subscribe(                    x =>                    {                        foreach (var s in q)                            s.OnNext(x);                        var c = n - count + 1;                        if (c >= 0 && c % skip == 0)                        {                            var s = q.Dequeue();                            s.OnCompleted();                        }                        n++;                        if (n % skip == 0)                            createWindow();                    },                    exception =>                    {                        while (q.Count > 0)                            q.Dequeue().OnError(exception);                        observer.OnError(exception);                    },                    () =>                    {                        while (q.Count > 0)                            q.Dequeue().OnCompleted();                        observer.OnCompleted();                    }                );                return refCountDisposable;            });#endif        }        #endregion    }}
 |