| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Collections.Generic;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Reactive.Subjects;namespace System.Reactive.Linq{#if !NO_PERF    using ObservableImpl;#endif    internal partial class QueryLanguage    {        #region + Buffer +        #region TimeSpan only        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan)        {            return Buffer_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)        {            return Buffer_<TSource>(source, timeSpan, timeSpan, scheduler);        }        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)        {            return Buffer_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)        {            return Buffer_<TSource>(source, timeSpan, timeShift, scheduler);        }        private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)        {#if !NO_PERF            return new Buffer<TSource>(source, timeSpan, timeShift, scheduler);#else            return source.Window(timeSpan, timeShift, scheduler).SelectMany(Observable.ToList);#endif        }        #endregion        #region TimeSpan + int        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)        {            return Buffer_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)        {            return Buffer_<TSource>(source, timeSpan, count, scheduler);        }        private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)        {#if !NO_PERF            return new Buffer<TSource>(source, timeSpan, count, scheduler);#else            return source.Window(timeSpan, count, scheduler).SelectMany(Observable.ToList);#endif        }        #endregion        #endregion        #region + Delay +        #region TimeSpan        public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime)        {            return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)        {            return Delay_<TSource>(source, dueTime, scheduler);        }        private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)        {#if !NO_PERF            return new Delay<TSource>(source, dueTime, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var q = new Queue<Timestamped<Notification<TSource>>>();                var active = false;                var running = false;                var cancelable = new SerialDisposable();                var exception = default(Exception);                var subscription = source.Materialize().Timestamp(scheduler).Subscribe(notification =>                {                    var shouldRun = false;                    lock (gate)                    {                        if (notification.Value.Kind == NotificationKind.OnError)                        {                            q.Clear();                            q.Enqueue(notification);                            exception = notification.Value.Exception;                            shouldRun = !running;                        }                        else                        {                            q.Enqueue(new Timestamped<Notification<TSource>>(notification.Value, notification.Timestamp.Add(dueTime)));                            shouldRun = !active;                            active = true;                        }                    }                    if (shouldRun)                    {                        if (exception != null)                            observer.OnError(exception);                        else                        {                            var d = new SingleAssignmentDisposable();                            cancelable.Disposable = d;                            d.Disposable = scheduler.Schedule(dueTime, self =>                            {                                lock (gate)                                {                                    if (exception != null)                                        return;                                    running = true;                                }                                Notification<TSource> result;                                do                                {                                    result = null;                                    lock (gate)                                    {                                        if (q.Count > 0 && q.Peek().Timestamp.CompareTo(scheduler.Now) <= 0)                                            result = q.Dequeue().Value;                                    }                                    if (result != null)                                        result.Accept(observer);                                } while (result != null);                                var shouldRecurse = false;                                var recurseDueTime = TimeSpan.Zero;                                var e = default(Exception);                                lock (gate)                                {                                    if (q.Count > 0)                                    {                                        shouldRecurse = true;                                        recurseDueTime = TimeSpan.FromTicks(Math.Max(0, q.Peek().Timestamp.Subtract(scheduler.Now).Ticks));                                    }                                    else                                        active = false;                                    e = exception;                                    running = false;                                }                                if (e != null)                                    observer.OnError(e);                                else if (shouldRecurse)                                    self(recurseDueTime);                            });                        }                    }                });                return new CompositeDisposable(subscription, cancelable);            });#endif        }        #endregion        #region DateTimeOffset        public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)        {            return Delay_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Delay<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)        {            return Delay_<TSource>(source, dueTime, scheduler);        }        private static IObservable<TSource> Delay_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)        {#if !NO_PERF            return new Delay<TSource>(source, dueTime, scheduler);#else            return Observable.Defer(() =>            {                var timeSpan = dueTime.Subtract(scheduler.Now);                return Delay_<TSource>(source, timeSpan, scheduler);            });#endif        }        #endregion        #region Duration selector        public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delayDurationSelector)        {            return Delay_<TSource, TDelay>(source, null, delayDurationSelector);        }        public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)        {            return Delay_<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);        }        private static IObservable<TSource> Delay_<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)        {#if !NO_PERF            return new Delay<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);#else            return new AnonymousObservable<TSource>(observer =>            {                var delays = new CompositeDisposable();                var gate = new object();                var atEnd = false;                var done = new Action(() =>                {                    if (atEnd && delays.Count == 0)                    {                        observer.OnCompleted();                    }                });                var subscription = new SerialDisposable();                var start = new Action(() =>                {                    subscription.Disposable = source.Subscribe(                        x =>                        {                            var delay = default(IObservable<TDelay>);                            try                            {                                delay = delayDurationSelector(x);                            }                            catch (Exception error)                            {                                lock (gate)                                    observer.OnError(error);                                return;                            }                            var d = new SingleAssignmentDisposable();                            delays.Add(d);                            d.Disposable = delay.Subscribe(                                _ =>                                {                                    lock (gate)                                    {                                        observer.OnNext(x);                                        delays.Remove(d);                                        done();                                    }                                },                                exception =>                                {                                    lock (gate)                                        observer.OnError(exception);                                },                                () =>                                {                                    lock (gate)                                    {                                        observer.OnNext(x);                                        delays.Remove(d);                                        done();                                    }                                }                            );                        },                        exception =>                        {                            lock (gate)                            {                                observer.OnError(exception);                            }                        },                        () =>                        {                            lock (gate)                            {                                atEnd = true;                                subscription.Dispose();                                done();                            }                        }                    );                });                if (subscriptionDelay == null)                {                    start();                }                else                {                    subscription.Disposable = subscriptionDelay.Subscribe(                        _ =>                        {                            start();                        },                        observer.OnError,                        start                    );                }                return new CompositeDisposable(subscription, delays);            });#endif        }        #endregion        #endregion        #region + DelaySubscription +        public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime)        {            return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)        {            return DelaySubscription_<TSource>(source, dueTime, scheduler);        }        private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)        {#if !NO_PERF            return new DelaySubscription<TSource>(source, dueTime, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var d = new MultipleAssignmentDisposable();                var dt = Normalize(dueTime);                d.Disposable = scheduler.Schedule(dt, () =>                {                    d.Disposable = source.Subscribe(observer);                });                return d;            });#endif        }        public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)        {            return DelaySubscription_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> DelaySubscription<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)        {            return DelaySubscription_<TSource>(source, dueTime, scheduler);        }        private static IObservable<TSource> DelaySubscription_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)        {#if !NO_PERF            return new DelaySubscription<TSource>(source, dueTime, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var d = new MultipleAssignmentDisposable();                d.Disposable = scheduler.Schedule(dueTime, () =>                {                    d.Disposable = source.Subscribe(observer);                });                return d;            });#endif        }        #endregion        #region + Generate +        public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector)        {            return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)        {            return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);        }        private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)        {#if !NO_PERF            return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);#else            return new AnonymousObservable<TResult>(observer =>            {                var state = initialState;                var first = true;                var hasResult = false;                var result = default(TResult);                var time = default(TimeSpan);                return scheduler.Schedule(TimeSpan.Zero, self =>                {                    if (hasResult)                        observer.OnNext(result);                    try                    {                        if (first)                            first = false;                        else                            state = iterate(state);                        hasResult = condition(state);                        if (hasResult)                        {                            result = resultSelector(state);                            time = timeSelector(state);                        }                    }                    catch (Exception exception)                    {                        observer.OnError(exception);                        return;                    }                    if (hasResult)                        self(time);                    else                        observer.OnCompleted();                });            });#endif        }        public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector)        {            return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TResult> Generate<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)        {            return Generate_<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);        }        private static IObservable<TResult> Generate_<TState, TResult>(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)        {#if !NO_PERF            return new Generate<TState, TResult>(initialState, condition, iterate, resultSelector, timeSelector, scheduler);#else            return new AnonymousObservable<TResult>(observer =>            {                var state = initialState;                var first = true;                var hasResult = false;                var result = default(TResult);                var time = default(DateTimeOffset);                return scheduler.Schedule(scheduler.Now, self =>                {                    if (hasResult)                        observer.OnNext(result);                    try                    {                        if (first)                            first = false;                        else                            state = iterate(state);                        hasResult = condition(state);                        if (hasResult)                        {                            result = resultSelector(state);                            time = timeSelector(state);                        }                    }                    catch (Exception exception)                    {                        observer.OnError(exception);                        return;                    }                    if (hasResult)                        self(time);                    else                        observer.OnCompleted();                });            });#endif        }        #endregion        #region + Interval +        public virtual IObservable<long> Interval(TimeSpan period)        {            return Timer_(period, period, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<long> Interval(TimeSpan period, IScheduler scheduler)        {            return Timer_(period, period, scheduler);        }        #endregion        #region + Sample +        public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval)        {            return Sample_<TSource>(source, interval, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Sample<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)        {            return Sample_<TSource>(source, interval, scheduler);        }        private static IObservable<TSource> Sample_<TSource>(IObservable<TSource> source, TimeSpan interval, IScheduler scheduler)        {#if !NO_PERF            return new Sample<TSource>(source, interval, scheduler);#else            var sampler = Observable.Interval(interval, scheduler);            return Sample_<TSource, long>(source, sampler);#endif        }        public virtual IObservable<TSource> Sample<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)        {            return Sample_<TSource, TSample>(source, sampler);        }        private static IObservable<TSource> Sample_<TSource, TSample>(IObservable<TSource> source, IObservable<TSample> sampler)        {#if !NO_PERF            return new Sample<TSource, TSample>(source, sampler);#else            return Combine(source, sampler, (IObserver<TSource> observer, IDisposable leftSubscription, IDisposable rightSubscription) =>            {                var value = default(Notification<TSource>);                var atEnd = false;                return new BinaryObserver<TSource, TSample>(                    newValue =>                    {                        switch (newValue.Kind)                        {                            case NotificationKind.OnNext:                                value = newValue;                                break;                            case NotificationKind.OnError:                                newValue.Accept(observer);                                break;                            case NotificationKind.OnCompleted:                                atEnd = true;                                break;                        }                    },                    _ =>                    {                        var myValue = value;                        value = null;                        if (myValue != null)                            myValue.Accept(observer);                        if (atEnd)                            observer.OnCompleted();                    });            });#endif        }        #endregion        #region + Skip +        public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration)        {            return Skip_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Skip<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {            return Skip_<TSource>(source, duration, scheduler);        }        private static IObservable<TSource> Skip_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {#if !NO_PERF            var skip = source as Skip<TSource>;            if (skip != null && skip._scheduler == scheduler)                return skip.Omega(duration);            return new Skip<TSource>(source, duration, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var open = false;                var t = scheduler.Schedule(duration, () => open = true);                var d = source.Subscribe(                    x =>                    {                        if (open)                            observer.OnNext(x);                    },                    observer.OnError,                    observer.OnCompleted                );                return new CompositeDisposable(t, d);            });#endif        }        #endregion        #region + SkipLast +        public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration)        {            return SkipLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> SkipLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {            return SkipLast_<TSource>(source, duration, scheduler);        }        private static IObservable<TSource> SkipLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {#if !NO_PERF            return new SkipLast<TSource>(source, duration, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var q = new Queue<System.Reactive.TimeInterval<TSource>>();                var swp = scheduler.AsStopwatchProvider();                var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();                return source.Subscribe(                    x =>                    {                        var now = sw.Elapsed;                        q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));                        while (q.Count > 0 && now - q.Peek().Interval >= duration)                            observer.OnNext(q.Dequeue().Value);                    },                    observer.OnError,                    () =>                    {                        var now = sw.Elapsed;                        while (q.Count > 0 && now - q.Peek().Interval >= duration)                            observer.OnNext(q.Dequeue().Value);                        observer.OnCompleted();                    }                );            });#endif        }        #endregion        #region + SkipUntil +        public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime)        {            return SkipUntil_<TSource>(source, startTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> SkipUntil<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)        {            return SkipUntil_<TSource>(source, startTime, scheduler);        }        private static IObservable<TSource> SkipUntil_<TSource>(IObservable<TSource> source, DateTimeOffset startTime, IScheduler scheduler)        {#if !NO_PERF            var skipUntil = source as SkipUntil<TSource>;            if (skipUntil != null && skipUntil._scheduler == scheduler)                return skipUntil.Omega(startTime);            return new SkipUntil<TSource>(source, startTime, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var open = false;                var t = scheduler.Schedule(startTime, () => open = true);                var d = source.Subscribe(                    x =>                    {                        if (open)                            observer.OnNext(x);                    },                    observer.OnError,                    observer.OnCompleted                );                return new CompositeDisposable(t, d);            });#endif        }        #endregion        #region + Take +        public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration)        {            return Take_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Take<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {            return Take_<TSource>(source, duration, scheduler);        }        private static IObservable<TSource> Take_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {#if !NO_PERF            var take = source as Take<TSource>;            if (take != null && take._scheduler == scheduler)                return take.Omega(duration);            return new Take<TSource>(source, duration, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var t = scheduler.Schedule(duration, () =>                {                    lock (gate)                    {                        observer.OnCompleted();                    }                });                var d = source.Synchronize(gate).Subscribe(observer);                return new CompositeDisposable(t, d);            });#endif        }        #endregion        #region + TakeLast +        public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration)        {            return TakeLast_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations, SchedulerDefaults.Iteration);        }        public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {            return TakeLast_<TSource>(source, duration, scheduler, SchedulerDefaults.Iteration);        }        public virtual IObservable<TSource> TakeLast<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)        {            return TakeLast_<TSource>(source, duration, timerScheduler, loopScheduler);        }        private static IObservable<TSource> TakeLast_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler timerScheduler, IScheduler loopScheduler)        {#if !NO_PERF            return new TakeLast<TSource>(source, duration, timerScheduler, loopScheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var q = new Queue<System.Reactive.TimeInterval<TSource>>();                var swp = timerScheduler.AsStopwatchProvider();                var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();                var trim = new Action<TimeSpan>(now =>                {                    while (q.Count > 0 && now - q.Peek().Interval >= duration)                        q.Dequeue();                });                var g = new CompositeDisposable();                g.Add(source.Subscribe(                    x =>                    {                        var now = sw.Elapsed;                        q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));                        trim(now);                    },                    observer.OnError,                    () =>                    {                        var now = sw.Elapsed;                        trim(now);                        g.Add(loopScheduler.Schedule(rec =>                        {                            if (q.Count > 0)                            {                                observer.OnNext(q.Dequeue().Value);                                rec();                            }                            else                            {                                observer.OnCompleted();                            }                        }));                    }                ));                return g;            });#endif        }        public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration)        {            return TakeLastBuffer_<TSource>(source, duration, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<IList<TSource>> TakeLastBuffer<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {            return TakeLastBuffer_<TSource>(source, duration, scheduler);        }        private static IObservable<IList<TSource>> TakeLastBuffer_<TSource>(IObservable<TSource> source, TimeSpan duration, IScheduler scheduler)        {#if !NO_PERF            return new TakeLastBuffer<TSource>(source, duration, scheduler);#else            return new AnonymousObservable<IList<TSource>>(observer =>            {                var q = new Queue<System.Reactive.TimeInterval<TSource>>();                var swp = scheduler.AsStopwatchProvider();                var sw = swp != null ? swp.StartStopwatch() : new DefaultStopwatch();                return source.Subscribe(                    x =>                    {                        var now = sw.Elapsed;                        q.Enqueue(new System.Reactive.TimeInterval<TSource>(x, now));                        while (q.Count > 0 && now - q.Peek().Interval >= duration)                            q.Dequeue();                    },                    observer.OnError,                    () =>                    {                        var now = sw.Elapsed;                        var res = new List<TSource>();                        while (q.Count > 0)                        {                            var next = q.Dequeue();                            if (now - next.Interval <= duration)                                res.Add(next.Value);                        }                                                observer.OnNext(res);                        observer.OnCompleted();                    }                );            });#endif        }        #endregion        #region + TakeUntil +        public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime)        {            return TakeUntil_<TSource>(source, endTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)        {            return TakeUntil_<TSource>(source, endTime, scheduler);        }        private static IObservable<TSource> TakeUntil_<TSource>(IObservable<TSource> source, DateTimeOffset endTime, IScheduler scheduler)        {#if !NO_PERF            var takeUntil = source as TakeUntil<TSource>;            if (takeUntil != null && takeUntil._scheduler == scheduler)                return takeUntil.Omega(endTime);            return new TakeUntil<TSource>(source, endTime, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var t = scheduler.Schedule(endTime, () =>                {                    lock (gate)                    {                        observer.OnCompleted();                    }                });                var d = source.Synchronize(gate).Subscribe(observer);                return new CompositeDisposable(t, d);            });#endif        }        #endregion        #region + Throttle +        public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime)        {            return Throttle_<TSource>(source, dueTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Throttle<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)        {            return Throttle_<TSource>(source, dueTime, scheduler);        }        private static IObservable<TSource> Throttle_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)        {#if !NO_PERF            return new Throttle<TSource>(source, dueTime, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var value = default(TSource);                var hasValue = false;                var cancelable = new SerialDisposable();                var id = 0UL;                var subscription = source.Subscribe(x =>                    {                        ulong currentid;                        lock (gate)                        {                            hasValue = true;                            value = x;                            id = unchecked(id + 1);                            currentid = id;                        }                        var d = new SingleAssignmentDisposable();                        cancelable.Disposable = d;                        d.Disposable = scheduler.Schedule(dueTime, () =>                            {                                lock (gate)                                {                                    if (hasValue && id == currentid)                                        observer.OnNext(value);                                    hasValue = false;                                }                            });                    },                    exception =>                    {                        cancelable.Dispose();                        lock (gate)                        {                            observer.OnError(exception);                            hasValue = false;                            id = unchecked(id + 1);                        }                                            },                    () =>                    {                        cancelable.Dispose();                        lock (gate)                        {                            if (hasValue)                                observer.OnNext(value);                            observer.OnCompleted();                            hasValue = false;                            id = unchecked(id + 1);                        }                    });                return new CompositeDisposable(subscription, cancelable);            });#endif        }        public virtual IObservable<TSource> Throttle<TSource, TThrottle>(IObservable<TSource> source, Func<TSource, IObservable<TThrottle>> throttleDurationSelector)        {#if !NO_PERF            return new Throttle<TSource, TThrottle>(source, throttleDurationSelector);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var value = default(TSource);                var hasValue = false;                var cancelable = new SerialDisposable();                var id = 0UL;                var subscription = source.Subscribe(                    x =>                    {                        var throttle = default(IObservable<TThrottle>);                        try                        {                            throttle = throttleDurationSelector(x);                        }                        catch (Exception error)                        {                            lock (gate)                                observer.OnError(error);                            return;                        }                        ulong currentid;                        lock (gate)                        {                            hasValue = true;                            value = x;                            id = unchecked(id + 1);                            currentid = id;                        }                        var d = new SingleAssignmentDisposable();                        cancelable.Disposable = d;                        d.Disposable = throttle.Subscribe(                            _ =>                            {                                lock (gate)                                {                                    if (hasValue && id == currentid)                                        observer.OnNext(value);                                    hasValue = false;                                    d.Dispose();                                }                            },                            exception =>                            {                                lock (gate)                                {                                    observer.OnError(exception);                                }                            },                            () =>                            {                                lock (gate)                                {                                    if (hasValue && id == currentid)                                        observer.OnNext(value);                                    hasValue = false;                                    d.Dispose();                                }                            }                        );                    },                    exception =>                    {                        cancelable.Dispose();                        lock (gate)                        {                            observer.OnError(exception);                            hasValue = false;                            id = unchecked(id + 1);                        }                    },                    () =>                    {                        cancelable.Dispose();                        lock (gate)                        {                            if (hasValue)                                observer.OnNext(value);                            observer.OnCompleted();                            hasValue = false;                            id = unchecked(id + 1);                        }                    });                return new CompositeDisposable(subscription, cancelable);            });#endif        }        #endregion        #region + TimeInterval +        public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source)        {            return TimeInterval_<TSource>(source, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval<TSource>(IObservable<TSource> source, IScheduler scheduler)        {            return TimeInterval_<TSource>(source, scheduler);        }#if !NO_PERF        private static IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler)        {            return new TimeInterval<TSource>(source, scheduler);        }#else        private IObservable<System.Reactive.TimeInterval<TSource>> TimeInterval_<TSource>(IObservable<TSource> source, IScheduler scheduler)        {            return Defer(() =>            {                var last = scheduler.Now;                return source.Select(x =>                {                    var now = scheduler.Now;                    var span = now.Subtract(last);                    last = now;                    return new System.Reactive.TimeInterval<TSource>(x, span);                });            });        }#endif        #endregion        #region + Timeout +        #region TimeSpan        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime)        {            return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IScheduler scheduler)        {            return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);        }        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other)        {            return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)        {            return Timeout_<TSource>(source, dueTime, other, scheduler);        }        private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)        {#if !NO_PERF            return new Timeout<TSource>(source, dueTime, other, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var subscription = new SerialDisposable();                var timer = new SerialDisposable();                var original = new SingleAssignmentDisposable();                subscription.Disposable = original;                var gate = new object();                var id = 0UL;                var switched = false;                Action createTimer = () =>                {                    var myid = id;                    timer.Disposable = scheduler.Schedule(dueTime, () =>                    {                        var timerWins = false;                        lock (gate)                        {                            switched = (id == myid);                            timerWins = switched;                        }                        if (timerWins)                            subscription.Disposable = other.Subscribe(observer);                    });                };                createTimer();                original.Disposable = source.Subscribe(                    x =>                    {                        var onNextWins = false;                        lock (gate)                        {                            onNextWins = !switched;                            if (onNextWins)                            {                                id = unchecked(id + 1);                            }                        }                        if (onNextWins)                        {                            observer.OnNext(x);                            createTimer();                        }                    },                    exception =>                    {                        var onErrorWins = false;                        lock (gate)                        {                            onErrorWins = !switched;                            if (onErrorWins)                            {                                id = unchecked(id + 1);                            }                        }                        if (onErrorWins)                            observer.OnError(exception);                    },                    () =>                    {                        var onCompletedWins = false;                        lock (gate)                        {                            onCompletedWins = !switched;                            if (onCompletedWins)                            {                                id = unchecked(id + 1);                            }                        }                        if (onCompletedWins)                            observer.OnCompleted();                    });                return new CompositeDisposable(subscription, timer);            });#endif        }        #endregion        #region DateTimeOffset        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime)        {            return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IScheduler scheduler)        {            return Timeout_<TSource>(source, dueTime, Observable.Throw<TSource>(new TimeoutException()), scheduler);        }        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other)        {            return Timeout_<TSource>(source, dueTime, other, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<TSource> Timeout<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)        {            return Timeout_<TSource>(source, dueTime, other, scheduler);        }        private static IObservable<TSource> Timeout_<TSource>(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)        {#if !NO_PERF            return new Timeout<TSource>(source, dueTime, other, scheduler);#else            return new AnonymousObservable<TSource>(observer =>            {                var subscription = new SerialDisposable();                var original = new SingleAssignmentDisposable();                subscription.Disposable = original;                var gate = new object();                var switched = false;                var timer = scheduler.Schedule(dueTime, () =>                {                    var timerWins = false;                    lock (gate)                    {                        timerWins = !switched;                        switched = true;                    }                    if (timerWins)                        subscription.Disposable = other.Subscribe(observer);                });                original.Disposable = source.Subscribe(                    x =>                    {                        lock (gate)                        {                            if (!switched)                                observer.OnNext(x);                        }                    },                    exception =>                    {                        var onErrorWins = false;                        lock (gate)                        {                            onErrorWins = !switched;                            switched = true;                        }                        if (onErrorWins)                            observer.OnError(exception);                    },                    () =>                    {                        var onCompletedWins = false;                        lock (gate)                        {                            onCompletedWins = !switched;                            switched = true;                        }                        if (onCompletedWins)                            observer.OnCompleted();                    });                return new CompositeDisposable(subscription, timer);            });#endif        }        #endregion        #region Duration selector        public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)        {            return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));        }        public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)        {            return Timeout_<TSource, TTimeout>(source, Observable.Never<TTimeout>(), timeoutDurationSelector, other);        }        public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector)        {            return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, Observable.Throw<TSource>(new TimeoutException()));        }        public virtual IObservable<TSource> Timeout<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)        {            return Timeout_<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);        }        private static IObservable<TSource> Timeout_<TSource, TTimeout>(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutDurationSelector, IObservable<TSource> other)        {#if !NO_PERF            return new Timeout<TSource, TTimeout>(source, firstTimeout, timeoutDurationSelector, other);#else            return new AnonymousObservable<TSource>(observer =>            {                var subscription = new SerialDisposable();                var timer = new SerialDisposable();                var original = new SingleAssignmentDisposable();                subscription.Disposable = original;                var gate = new object();                var id = 0UL;                var switched = false;                Action<IObservable<TTimeout>> setTimer = timeout =>                {                    var myid = id;                    Func<bool> timerWins = () =>                    {                        var res = false;                        lock (gate)                        {                            switched = (id == myid);                            res = switched;                        }                        return res;                    };                    var d = new SingleAssignmentDisposable();                    timer.Disposable = d;                    d.Disposable = timeout.Subscribe(                        _ =>                        {                            if (timerWins())                                subscription.Disposable = other.Subscribe(observer);                            d.Dispose();                        },                        error =>                        {                            if (timerWins())                                observer.OnError(error);                        },                        () =>                        {                            if (timerWins())                                subscription.Disposable = other.Subscribe(observer);                        }                    );                };                setTimer(firstTimeout);                Func<bool> observerWins = () =>                {                    var res = false;                    lock (gate)                    {                        res = !switched;                        if (res)                        {                            id = unchecked(id + 1);                        }                    }                    return res;                };                original.Disposable = source.Subscribe(                    x =>                    {                        if (observerWins())                        {                            observer.OnNext(x);                            var timeout = default(IObservable<TTimeout>);                            try                            {                                timeout = timeoutDurationSelector(x);                            }                            catch (Exception error)                            {                                observer.OnError(error);                                return;                            }                            setTimer(timeout);                        }                    },                    exception =>                    {                        if (observerWins())                            observer.OnError(exception);                    },                    () =>                    {                        if (observerWins())                            observer.OnCompleted();                    }                );                return new CompositeDisposable(subscription, timer);            });#endif        }        #endregion        #endregion        #region + Timer +        public virtual IObservable<long> Timer(TimeSpan dueTime)        {            return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<long> Timer(DateTimeOffset dueTime)        {            return Timer_(dueTime, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period)        {            return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period)        {            return Timer_(dueTime, period, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<long> Timer(TimeSpan dueTime, IScheduler scheduler)        {            return Timer_(dueTime, scheduler);        }        public virtual IObservable<long> Timer(DateTimeOffset dueTime, IScheduler scheduler)        {            return Timer_(dueTime, scheduler);        }        public virtual IObservable<long> Timer(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)        {            return Timer_(dueTime, period, scheduler);        }        public virtual IObservable<long> Timer(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)        {            return Timer_(dueTime, period, scheduler);        }        private static IObservable<long> Timer_(TimeSpan dueTime, IScheduler scheduler)        {#if !NO_PERF            return new Timer(dueTime, null, scheduler);#else            var d = Normalize(dueTime);            return new AnonymousObservable<long>(observer =>                scheduler.Schedule(d, () =>                {                    observer.OnNext(0);                    observer.OnCompleted();                }));#endif        }#if !NO_PERF        private static IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)        {            return new Timer(dueTime, period, scheduler);        }#else        private IObservable<long> Timer_(TimeSpan dueTime, TimeSpan period, IScheduler scheduler)        {            var p = Normalize(period);            return Defer(() => Timer(scheduler.Now + dueTime, p, scheduler));        }#endif        private static IObservable<long> Timer_(DateTimeOffset dueTime, IScheduler scheduler)        {#if !NO_PERF            return new Timer(dueTime, null, scheduler);#else            return new AnonymousObservable<long>(observer =>                scheduler.Schedule(dueTime, () =>                {                    observer.OnNext(0);                    observer.OnCompleted();                }));#endif        }        private static IObservable<long> Timer_(DateTimeOffset dueTime, TimeSpan period, IScheduler scheduler)        {#if !NO_PERF            return new Timer(dueTime, period, scheduler);#else            var p = Normalize(period);            return new AnonymousObservable<long>(observer =>            {                var d = dueTime;                var count = 0L;                return scheduler.Schedule(d, self =>                {                    if (p > TimeSpan.Zero)                    {                        var now = scheduler.Now;                        d = d + p;                        if (d <= now)                            d = now + p;                    }                    observer.OnNext(count);                    count = unchecked(count + 1);                    self(d);                });            });#endif        }        #endregion        #region + Timestamp +        public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source)        {            return Timestamp_<TSource>(source, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<Timestamped<TSource>> Timestamp<TSource>(IObservable<TSource> source, IScheduler scheduler)        {            return Timestamp_<TSource>(source, scheduler);        }        private static IObservable<Timestamped<TSource>> Timestamp_<TSource>(IObservable<TSource> source, IScheduler scheduler)        {#if !NO_PERF            return new Timestamp<TSource>(source, scheduler);#else            return source.Select(x => new Timestamped<TSource>(x, scheduler.Now));#endif        }        #endregion        #region + Window +        #region TimeSpan only        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan)        {            return Window_<TSource>(source, timeSpan, timeSpan, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, IScheduler scheduler)        {            return Window_<TSource>(source, timeSpan, timeSpan, scheduler);        }        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift)        {            return Window_<TSource>(source, timeSpan, timeShift, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)        {            return Window_<TSource>(source, timeSpan, timeShift, scheduler);        }        private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, TimeSpan timeShift, IScheduler scheduler)        {#if !NO_PERF            return new Window<TSource>(source, timeSpan, timeShift, scheduler);#else            return new AnonymousObservable<IObservable<TSource>>(observer =>            {                var totalTime = TimeSpan.Zero;                var nextShift = timeShift;                var nextSpan = timeSpan;                var gate = new object();                var q = new Queue<ISubject<TSource>>();                var timerD = new SerialDisposable();                var groupDisposable = new CompositeDisposable(2) { timerD };                var refCountDisposable = new RefCountDisposable(groupDisposable);                var createTimer = default(Action);                createTimer = () =>                {                    var m = new SingleAssignmentDisposable();                    timerD.Disposable = m;                    var isSpan = false;                    var isShift = false;                    if (nextSpan == nextShift)                    {                        isSpan = true;                        isShift = true;                    }                    else if (nextSpan < nextShift)                        isSpan = true;                    else                        isShift = true;                    var newTotalTime = isSpan ? nextSpan : nextShift;                    var ts = newTotalTime - totalTime;                    totalTime = newTotalTime;                    if (isSpan)                        nextSpan += timeShift;                    if (isShift)                        nextShift += timeShift;                    m.Disposable = scheduler.Schedule(ts, () =>                    {                        lock (gate)                        {                            if (isShift)                            {                                var s = new Subject<TSource>();                                q.Enqueue(s);                                observer.OnNext(s.AddRef(refCountDisposable));                            }                            if (isSpan)                            {                                var s = q.Dequeue();                                s.OnCompleted();                            }                        }                        createTimer();                    });                };                q.Enqueue(new Subject<TSource>());                observer.OnNext(q.Peek().AddRef(refCountDisposable));                createTimer();                groupDisposable.Add(source.Subscribe(                    x =>                    {                        lock (gate)                        {                            foreach (var s in q)                                s.OnNext(x);                        }                    },                    exception =>                    {                        lock (gate)                        {                            foreach (var s in q)                                s.OnError(exception);                            observer.OnError(exception);                        }                    },                    () =>                    {                        lock (gate)                        {                            foreach (var s in q)                                s.OnCompleted();                            observer.OnCompleted();                        }                    }                ));                return refCountDisposable;            });#endif        }        #endregion        #region TimeSpan + int        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count)        {            return Window_<TSource>(source, timeSpan, count, SchedulerDefaults.TimeBasedOperations);        }        public virtual IObservable<IObservable<TSource>> Window<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)        {            return Window_<TSource>(source, timeSpan, count, scheduler);        }        private static IObservable<IObservable<TSource>> Window_<TSource>(IObservable<TSource> source, TimeSpan timeSpan, int count, IScheduler scheduler)        {#if !NO_PERF            return new Window<TSource>(source, timeSpan, count, scheduler);#else            return new AnonymousObservable<IObservable<TSource>>(observer =>            {                var gate = new object();                var s = default(ISubject<TSource>);                var n = 0;                var windowId = 0;                var timerD = new SerialDisposable();                var groupDisposable = new CompositeDisposable(2) { timerD };                var refCountDisposable = new RefCountDisposable(groupDisposable);                var createTimer = default(Action<int>);                createTimer = id =>                {                    var m = new SingleAssignmentDisposable();                    timerD.Disposable = m;                    m.Disposable = scheduler.Schedule(timeSpan, () =>                    {                        var newId = 0;                        lock (gate)                        {                            if (id != windowId)                                return;                            n = 0;                            newId = ++windowId;                            s.OnCompleted();                            s = new Subject<TSource>();                            observer.OnNext(s.AddRef(refCountDisposable));                        }                        createTimer(newId);                    });                };                s = new Subject<TSource>();                observer.OnNext(s.AddRef(refCountDisposable));                createTimer(0);                groupDisposable.Add(source.Subscribe(                    x =>                    {                        var newWindow = false;                        var newId = 0;                        lock (gate)                        {                            s.OnNext(x);                            n++;                            if (n == count)                            {                                newWindow = true;                                n = 0;                                newId = ++windowId;                                s.OnCompleted();                                s = new Subject<TSource>();                                observer.OnNext(s.AddRef(refCountDisposable));                            }                        }                        if (newWindow)                            createTimer(newId);                    },                    exception =>                    {                        lock (gate)                        {                            s.OnError(exception);                            observer.OnError(exception);                        }                    },                    () =>                    {                        lock (gate)                        {                            s.OnCompleted();                            observer.OnCompleted();                        }                    }                ));                return refCountDisposable;            });#endif        }        #endregion        #endregion        #region |> Helpers <|#if NO_PERF        private static TimeSpan Normalize(TimeSpan timeSpan)        {            if (timeSpan.CompareTo(TimeSpan.Zero) < 0)                return TimeSpan.Zero;            return timeSpan;        }#endif        #endregion    }}
 |