| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Collections;using System.Collections.Generic;using System.Collections.ObjectModel;using System.Linq;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Reactive.Subjects;#if !NO_TPLusing System.Reactive.Threading.Tasks;using System.Threading.Tasks;#endifnamespace System.Reactive.Linq{#if !NO_PERF    using ObservableImpl;#endif    internal partial class QueryLanguage    {        #region + Amb +        public virtual IObservable<TSource> Amb<TSource>(IObservable<TSource> first, IObservable<TSource> second)        {            return Amb_(first, second);        }        public virtual IObservable<TSource> Amb<TSource>(params IObservable<TSource>[] sources)        {            return Amb_(sources);        }        public virtual IObservable<TSource> Amb<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return Amb_(sources);        }        private static IObservable<TSource> Amb_<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return sources.Aggregate(Observable.Never<TSource>(), (previous, current) => previous.Amb(current));        }        private static IObservable<TSource> Amb_<TSource>(IObservable<TSource> leftSource, IObservable<TSource> rightSource)        {#if !NO_PERF            return new Amb<TSource>(leftSource, rightSource);#else            return new AnonymousObservable<TSource>(observer =>            {                var leftSubscription = new SingleAssignmentDisposable();                var rightSubscription = new SingleAssignmentDisposable();                var choice = AmbState.Neither;                var gate = new object();                var left = new AmbObserver<TSource>();                var right = new AmbObserver<TSource>();                left.Observer = Observer.Synchronize(Observer.Create<TSource>(                    x =>                    {                        if (choice == AmbState.Neither)                        {                            choice = AmbState.Left;                            rightSubscription.Dispose();                            left.Observer = observer;                        }                        if (choice == AmbState.Left)                            observer.OnNext(x);                    },                    ex =>                    {                        if (choice == AmbState.Neither)                        {                            choice = AmbState.Left;                            rightSubscription.Dispose();                            left.Observer = observer;                        }                        if (choice == AmbState.Left)                            observer.OnError(ex);                    },                    () =>                    {                        if (choice == AmbState.Neither)                        {                            choice = AmbState.Left;                            rightSubscription.Dispose();                            left.Observer = observer;                        }                        if (choice == AmbState.Left)                            observer.OnCompleted();                    }                ), gate);                right.Observer = Observer.Synchronize(Observer.Create<TSource>(                    x =>                    {                        if (choice == AmbState.Neither)                        {                            choice = AmbState.Right;                            leftSubscription.Dispose();                            right.Observer = observer;                        }                        if (choice == AmbState.Right)                            observer.OnNext(x);                    },                    ex =>                    {                        if (choice == AmbState.Neither)                        {                            choice = AmbState.Right;                            leftSubscription.Dispose();                            right.Observer = observer;                        }                        if (choice == AmbState.Right)                            observer.OnError(ex);                    },                    () =>                    {                        if (choice == AmbState.Neither)                        {                            choice = AmbState.Right;                            leftSubscription.Dispose();                            right.Observer = observer;                        }                        if (choice == AmbState.Right)                            observer.OnCompleted();                    }                ), gate);                leftSubscription.Disposable = leftSource.Subscribe(left);                rightSubscription.Disposable = rightSource.Subscribe(right);                return new CompositeDisposable(leftSubscription, rightSubscription);            });#endif        }#if NO_PERF        class AmbObserver<TSource> : IObserver<TSource>        {            public virtual IObserver<TSource> Observer { get; set; }            public virtual void OnCompleted()            {                Observer.OnCompleted();            }            public virtual void OnError(Exception error)            {                Observer.OnError(error);            }            public virtual void OnNext(TSource value)            {                Observer.OnNext(value);            }        }        enum AmbState        {            Left,            Right,            Neither        }#endif        #endregion        #region + Buffer +        public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferClosing>(IObservable<TSource> source, Func<IObservable<TBufferClosing>> bufferClosingSelector)        {#if !NO_PERF            return new Buffer<TSource, TBufferClosing>(source, bufferClosingSelector);#else            return source.Window(bufferClosingSelector).SelectMany(ToList);#endif        }        public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferOpening, TBufferClosing>(IObservable<TSource> source, IObservable<TBufferOpening> bufferOpenings, Func<TBufferOpening, IObservable<TBufferClosing>> bufferClosingSelector)        {            return source.Window(bufferOpenings, bufferClosingSelector).SelectMany(ToList);        }        public virtual IObservable<IList<TSource>> Buffer<TSource, TBufferBoundary>(IObservable<TSource> source, IObservable<TBufferBoundary> bufferBoundaries)        {#if !NO_PERF            return new Buffer<TSource, TBufferBoundary>(source, bufferBoundaries);#else            return source.Window(bufferBoundaries).SelectMany(ToList);#endif        }        #endregion        #region + Catch +        public virtual IObservable<TSource> Catch<TSource, TException>(IObservable<TSource> source, Func<TException, IObservable<TSource>> handler) where TException : Exception        {#if !NO_PERF            return new Catch<TSource, TException>(source, handler);#else            return new AnonymousObservable<TSource>(observer =>            {                var subscription = new SerialDisposable();                var d1 = new SingleAssignmentDisposable();                subscription.Disposable = d1;                d1.Disposable = source.Subscribe(observer.OnNext,                    exception =>                    {                        var e = exception as TException;                        if (e != null)                        {                            IObservable<TSource> result;                            try                            {                                result = handler(e);                            }                            catch (Exception ex)                            {                                observer.OnError(ex);                                return;                            }                            var d = new SingleAssignmentDisposable();                            subscription.Disposable = d;                            d.Disposable = result.Subscribe(observer);                        }                        else                            observer.OnError(exception);                    }, observer.OnCompleted);                return subscription;            });#endif        }        public virtual IObservable<TSource> Catch<TSource>(IObservable<TSource> first, IObservable<TSource> second)        {            return Catch_<TSource>(new[] { first, second });        }        public virtual IObservable<TSource> Catch<TSource>(params IObservable<TSource>[] sources)        {            return Catch_<TSource>(sources);        }        public virtual IObservable<TSource> Catch<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return Catch_<TSource>(sources);        }        private static IObservable<TSource> Catch_<TSource>(IEnumerable<IObservable<TSource>> sources)        {#if !NO_PERF            return new Catch<TSource>(sources);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new AsyncLock();                var isDisposed = false;                var e = sources.GetEnumerator();                var subscription = new SerialDisposable();                var lastException = default(Exception);                var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>                {                    var current = default(IObservable<TSource>);                    var hasNext = false;                    var ex = default(Exception);                    if (!isDisposed)                    {                        try                        {                            hasNext = e.MoveNext();                            if (hasNext)                                current = e.Current;                            else                                e.Dispose();                        }                        catch (Exception exception)                        {                            ex = exception;                            e.Dispose();                        }                    }                    else                        return;                    if (ex != null)                    {                        observer.OnError(ex);                        return;                    }                    if (!hasNext)                    {                        if (lastException != null)                            observer.OnError(lastException);                        else                            observer.OnCompleted();                        return;                    }                    var d = new SingleAssignmentDisposable();                    subscription.Disposable = d;                    d.Disposable = current.Subscribe(observer.OnNext, exception =>                    {                        lastException = exception;                        self();                    }, observer.OnCompleted);                }));                return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>                {                    e.Dispose();                    isDisposed = true;                })));            });#endif        }        #endregion        #region + CombineLatest +        public virtual IObservable<TResult> CombineLatest<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)        {#if !NO_PERF            return new CombineLatest<TFirst, TSecond, TResult>(first, second, resultSelector);#else            return new AnonymousObservable<TResult>(observer =>            {                var hasLeft = false;                var hasRight = false;                var left = default(TFirst);                var right = default(TSecond);                var leftDone = false;                var rightDone = false;                var leftSubscription = new SingleAssignmentDisposable();                var rightSubscription = new SingleAssignmentDisposable();                var gate = new object();                leftSubscription.Disposable = first.Synchronize(gate).Subscribe(                    l =>                    {                        hasLeft = true;                        left = l;                        if (hasRight)                        {                            var res = default(TResult);                            try                            {                                res = resultSelector(left, right);                            }                            catch (Exception ex)                            {                                observer.OnError(ex);                                return;                            }                            observer.OnNext(res);                        }                        else if (rightDone)                        {                            observer.OnCompleted();                            return;                        }                    },                    observer.OnError,                    () =>                    {                        leftDone = true;                        if (rightDone)                        {                            observer.OnCompleted();                            return;                        }                    }                );                rightSubscription.Disposable = second.Synchronize(gate).Subscribe(                    r =>                    {                        hasRight = true;                        right = r;                        if (hasLeft)                        {                            var res = default(TResult);                            try                            {                                res = resultSelector(left, right);                            }                            catch (Exception ex)                            {                                observer.OnError(ex);                                return;                            }                            observer.OnNext(res);                        }                        else if (leftDone)                        {                            observer.OnCompleted();                            return;                        }                    },                    observer.OnError,                    () =>                    {                        rightDone = true;                        if (leftDone)                        {                            observer.OnCompleted();                            return;                        }                    }                );                return new CompositeDisposable(leftSubscription, rightSubscription);            });#endif        }#if !NO_PERF        /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */        #region CombineLatest auto-generated code (6/10/2012 7:25:03 PM)        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);        }#if !NO_LARGEARITY        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);        }        public virtual IObservable<TResult> CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)        {            return new CombineLatest<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);        }#endif        #endregion#endif        public virtual IObservable<TResult> CombineLatest<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)        {            return CombineLatest_<TSource, TResult>(sources, resultSelector);        }        public virtual IObservable<IList<TSource>> CombineLatest<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());        }        public virtual IObservable<IList<TSource>> CombineLatest<TSource>(params IObservable<TSource>[] sources)        {            return CombineLatest_<TSource, IList<TSource>>(sources, res => res.ToList());        }        private static IObservable<TResult> CombineLatest_<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)        {#if !NO_PERF            return new CombineLatest<TSource, TResult>(sources, resultSelector);#else            return new AnonymousObservable<TResult>(observer =>            {                var srcs = sources.ToArray();                var N = srcs.Length;                var hasValue = new bool[N];                var hasValueAll = false;                var values = new List<TSource>(N);                for (int i = 0; i < N; i++)                    values.Add(default(TSource));                var isDone = new bool[N];                var next = new Action<int>(i =>                {                    hasValue[i] = true;                    if (hasValueAll || (hasValueAll = hasValue.All(Stubs<bool>.I)))                    {                        var res = default(TResult);                        try                        {                            res = resultSelector(new ReadOnlyCollection<TSource>(values));                        }                        catch (Exception ex)                        {                            observer.OnError(ex);                            return;                        }                        observer.OnNext(res);                    }                    else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))                    {                        observer.OnCompleted();                        return;                    }                });                var done = new Action<int>(i =>                {                    isDone[i] = true;                    if (isDone.All(Stubs<bool>.I))                    {                        observer.OnCompleted();                        return;                    }                });                var subscriptions = new SingleAssignmentDisposable[N];                var gate = new object();                for (int i = 0; i < N; i++)                {                    var j = i;                    subscriptions[j] = new SingleAssignmentDisposable                    {                        Disposable = srcs[j].Synchronize(gate).Subscribe(                            x =>                            {                                values[j] = x;                                next(j);                            },                            observer.OnError,                            () =>                            {                                done(j);                            }                        )                    };                }                return new CompositeDisposable(subscriptions);            });#endif        }        #endregion        #region + Concat +        public virtual IObservable<TSource> Concat<TSource>(IObservable<TSource> first, IObservable<TSource> second)        {            return Concat_<TSource>(new[] { first, second });        }        public virtual IObservable<TSource> Concat<TSource>(params IObservable<TSource>[] sources)        {            return Concat_<TSource>(sources);        }        public virtual IObservable<TSource> Concat<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return Concat_<TSource>(sources);        }        private static IObservable<TSource> Concat_<TSource>(IEnumerable<IObservable<TSource>> sources)        {#if !NO_PERF            return new Concat<TSource>(sources);#else            return new AnonymousObservable<TSource>(observer =>            {                var isDisposed = false;                var e = sources.GetEnumerator();                var subscription = new SerialDisposable();                var gate = new AsyncLock();                var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>                {                    var current = default(IObservable<TSource>);                    var hasNext = false;                    var ex = default(Exception);                    if (!isDisposed)                    {                        try                        {                            hasNext = e.MoveNext();                            if (hasNext)                                current = e.Current;                            else                                e.Dispose();                        }                        catch (Exception exception)                        {                            ex = exception;                            e.Dispose();                        }                    }                    else                        return;                    if (ex != null)                    {                        observer.OnError(ex);                        return;                    }                    if (!hasNext)                    {                        observer.OnCompleted();                        return;                    }                    var d = new SingleAssignmentDisposable();                    subscription.Disposable = d;                    d.Disposable = current.Subscribe(observer.OnNext, observer.OnError, self);                }));                return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>                            {                                e.Dispose();                                isDisposed = true;                            })));            });#endif        }        public virtual IObservable<TSource> Concat<TSource>(IObservable<IObservable<TSource>> sources)        {            return Concat_<TSource>(sources);        }#if !NO_TPL        public virtual IObservable<TSource> Concat<TSource>(IObservable<Task<TSource>> sources)        {            return Concat_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));        }#endif        private IObservable<TSource> Concat_<TSource>(IObservable<IObservable<TSource>> sources)        {            return Merge(sources, 1);        }        #endregion        #region + Merge +        public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources)        {            return Merge_<TSource>(sources);        }#if !NO_TPL        public virtual IObservable<TSource> Merge<TSource>(IObservable<Task<TSource>> sources)        {#if !NO_PERF            return new Merge<TSource>(sources);#else            return Merge_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));#endif        }#endif        public virtual IObservable<TSource> Merge<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)        {            return Merge_<TSource>(sources, maxConcurrent);        }        public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent)        {            return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations), maxConcurrent);        }        public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, int maxConcurrent, IScheduler scheduler)        {            return Merge_<TSource>(sources.ToObservable(scheduler), maxConcurrent);        }        public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second)        {            return Merge_<TSource>(new[] { first, second }.ToObservable(SchedulerDefaults.ConstantTimeOperations));        }        public virtual IObservable<TSource> Merge<TSource>(IObservable<TSource> first, IObservable<TSource> second, IScheduler scheduler)        {            return Merge_<TSource>(new[] { first, second }.ToObservable(scheduler));        }        public virtual IObservable<TSource> Merge<TSource>(params IObservable<TSource>[] sources)        {            return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations));        }        public virtual IObservable<TSource> Merge<TSource>(IScheduler scheduler, params IObservable<TSource>[] sources)        {            return Merge_<TSource>(sources.ToObservable(scheduler));        }        public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return Merge_<TSource>(sources.ToObservable(SchedulerDefaults.ConstantTimeOperations));        }        public virtual IObservable<TSource> Merge<TSource>(IEnumerable<IObservable<TSource>> sources, IScheduler scheduler)        {            return Merge_<TSource>(sources.ToObservable(scheduler));        }        private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources)        {#if !NO_PERF            return new Merge<TSource>(sources);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var isStopped = false;                var m = new SingleAssignmentDisposable();                var group = new CompositeDisposable() { m };                m.Disposable = sources.Subscribe(                    innerSource =>                    {                        var innerSubscription = new SingleAssignmentDisposable();                        group.Add(innerSubscription);                        innerSubscription.Disposable = innerSource.Subscribe(                            x =>                            {                                lock (gate)                                    observer.OnNext(x);                            },                            exception =>                            {                                lock (gate)                                    observer.OnError(exception);                            },                            () =>                            {                                group.Remove(innerSubscription);   // modification MUST occur before subsequent check                                if (isStopped && group.Count == 1) // isStopped must be checked before group Count to ensure outer is not creating more groups                                    lock (gate)                                        observer.OnCompleted();                            });                    },                    exception =>                    {                        lock (gate)                            observer.OnError(exception);                    },                    () =>                    {                        isStopped = true;     // modification MUST occur before subsequent check                        if (group.Count == 1)                            lock (gate)                                observer.OnCompleted();                    });                return group;            });#endif        }        private static IObservable<TSource> Merge_<TSource>(IObservable<IObservable<TSource>> sources, int maxConcurrent)        {#if !NO_PERF            return new Merge<TSource>(sources, maxConcurrent);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var q = new Queue<IObservable<TSource>>();                var isStopped = false;                var group = new CompositeDisposable();                var activeCount = 0;                var subscribe = default(Action<IObservable<TSource>>);                subscribe = xs =>                {                    var subscription = new SingleAssignmentDisposable();                    group.Add(subscription);                    subscription.Disposable = xs.Subscribe(                        x =>                        {                            lock (gate)                                observer.OnNext(x);                        },                        exception =>                        {                            lock (gate)                                observer.OnError(exception);                        },                        () =>                        {                            group.Remove(subscription);                            lock (gate)                            {                                if (q.Count > 0)                                {                                    var s = q.Dequeue();                                    subscribe(s);                                }                                else                                {                                    activeCount--;                                    if (isStopped && activeCount == 0)                                        observer.OnCompleted();                                }                            }                        });                };                group.Add(sources.Subscribe(                    innerSource =>                    {                        lock (gate)                        {                            if (activeCount < maxConcurrent)                            {                                activeCount++;                                subscribe(innerSource);                            }                            else                                q.Enqueue(innerSource);                        }                    },                    exception =>                    {                        lock (gate)                            observer.OnError(exception);                    },                    () =>                    {                        lock (gate)                        {                            isStopped = true;                            if (activeCount == 0)                                observer.OnCompleted();                        }                    }));                return group;            });#endif        }        #endregion        #region + OnErrorResumeNext +        public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IObservable<TSource> first, IObservable<TSource> second)        {            return OnErrorResumeNext_<TSource>(new[] { first, second });        }        public virtual IObservable<TSource> OnErrorResumeNext<TSource>(params IObservable<TSource>[] sources)        {            return OnErrorResumeNext_<TSource>(sources);        }        public virtual IObservable<TSource> OnErrorResumeNext<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return OnErrorResumeNext_<TSource>(sources);        }        private static IObservable<TSource> OnErrorResumeNext_<TSource>(IEnumerable<IObservable<TSource>> sources)        {#if !NO_PERF            return new OnErrorResumeNext<TSource>(sources);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new AsyncLock();                var isDisposed = false;                var e = sources.GetEnumerator();                var subscription = new SerialDisposable();                var cancelable = SchedulerDefaults.TailRecursion.Schedule(self => gate.Wait(() =>                {                    var current = default(IObservable<TSource>);                    var hasNext = false;                    var ex = default(Exception);                    if (!isDisposed)                    {                        try                        {                            hasNext = e.MoveNext();                            if (hasNext)                                current = e.Current;                            else                                e.Dispose();                        }                        catch (Exception exception)                        {                            ex = exception;                            e.Dispose();                        }                    }                    else                        return;                    if (ex != null)                    {                        observer.OnError(ex);                        return;                    }                    if (!hasNext)                    {                        observer.OnCompleted();                        return;                    }                    var d = new SingleAssignmentDisposable();                    subscription.Disposable = d;                    d.Disposable = current.Subscribe(observer.OnNext, exception => self(), self);                }));                return new CompositeDisposable(subscription, cancelable, Disposable.Create(() => gate.Wait(() =>                {                    e.Dispose();                    isDisposed = true;                })));            });#endif        }        #endregion        #region + SkipUntil +        public virtual IObservable<TSource> SkipUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)        {#if !NO_PERF            return new SkipUntil<TSource, TOther>(source, other);#else            return new AnonymousObservable<TSource>(observer =>            {                var sourceSubscription = new SingleAssignmentDisposable();                var otherSubscription = new SingleAssignmentDisposable();                var open = false;                var gate = new object();                sourceSubscription.Disposable = source.Synchronize(gate).Subscribe(                    x =>                    {                        if (open)                            observer.OnNext(x);                    },                    observer.OnError, // BREAKING CHANGE - Error propagation was guarded by "other" source in v1.0.10621 (due to materialization).                    () =>                    {                        if (open)                            observer.OnCompleted();                    }                );                otherSubscription.Disposable = other.Synchronize(gate).Subscribe(                    x =>                    {                        open = true;                        otherSubscription.Dispose();                    },                    observer.OnError                );                return new CompositeDisposable(sourceSubscription, otherSubscription);            });#endif        }        #endregion        #region + Switch +        public virtual IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources)        {            return Switch_<TSource>(sources);        }#if !NO_TPL        public virtual IObservable<TSource> Switch<TSource>(IObservable<Task<TSource>> sources)        {            return Switch_<TSource>(Select(sources, TaskObservableExtensions.ToObservable));        }#endif        private IObservable<TSource> Switch_<TSource>(IObservable<IObservable<TSource>> sources)        {#if !NO_PERF            return new Switch<TSource>(sources);#else            return new AnonymousObservable<TSource>(observer =>            {                var gate = new object();                var innerSubscription = new SerialDisposable();                var isStopped = false;                var latest = 0UL;                var hasLatest = false;                var subscription = sources.Subscribe(                    innerSource =>                    {                        var id = default(ulong);                        lock (gate)                        {                            id = unchecked(++latest);                            hasLatest = true;                        }                        var d = new SingleAssignmentDisposable();                        innerSubscription.Disposable = d;                        d.Disposable = innerSource.Subscribe(                        x =>                        {                            lock (gate)                            {                                if (latest == id)                                    observer.OnNext(x);                            }                        },                        exception =>                        {                            lock (gate)                            {                                if (latest == id)                                    observer.OnError(exception);                            }                        },                        () =>                        {                            lock (gate)                            {                                if (latest == id)                                {                                    hasLatest = false;                                    if (isStopped)                                        observer.OnCompleted();                                }                            }                        });                    },                    exception =>                    {                        lock (gate)                            observer.OnError(exception);                    },                    () =>                    {                        lock (gate)                        {                            isStopped = true;                            if (!hasLatest)                                observer.OnCompleted();                        }                    });                return new CompositeDisposable(subscription, innerSubscription);            });#endif        }        #endregion        #region + TakeUntil +        public virtual IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other)        {#if !NO_PERF            return new TakeUntil<TSource, TOther>(source, other);#else            return new AnonymousObservable<TSource>(observer =>            {                var sourceSubscription = new SingleAssignmentDisposable();                var otherSubscription = new SingleAssignmentDisposable();                var gate = new object();                // COMPAT - Order of Subscribe calls per v1.0.10621                otherSubscription.Disposable = other.Synchronize(gate).Subscribe(                    x =>                    {                        observer.OnCompleted();                    },                    observer.OnError                );                sourceSubscription.Disposable = source.Synchronize(gate).Finally(otherSubscription.Dispose).Subscribe(observer);                return new CompositeDisposable(sourceSubscription, otherSubscription);            });#endif        }        #endregion        #region + Window +        public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector)        {#if !NO_PERF            return new Window<TSource, TWindowClosing>(source, windowClosingSelector);#else            return new AnonymousObservable<IObservable<TSource>>(observer =>            {                var window = new Subject<TSource>();                var gate = new object();                var m = new SerialDisposable();                var d = new CompositeDisposable(2) { m };                var r = new RefCountDisposable(d);                observer.OnNext(window.AddRef(r));                d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(                    x =>                    {                        lock (gate)                        {                            window.OnNext(x);                        }                    },                    ex =>                    {                        lock (gate)                        {                            window.OnError(ex);                            observer.OnError(ex);                        }                    },                    () =>                    {                        lock (gate)                        {                            window.OnCompleted();                            observer.OnCompleted();                        }                    })));                var l = new AsyncLock();                Action createWindowClose = null;                createWindowClose = () =>                {                    var windowClose = default(IObservable<TWindowClosing>);                    try                    {                        windowClose = windowClosingSelector();                    }                    catch (Exception exception)                    {                        lock (gate)                        {                            observer.OnError(exception);                        }                        return;                    }                    var m1 = new SingleAssignmentDisposable();                    m.Disposable = m1;                    m1.Disposable = windowClose.Take(1).SubscribeSafe(new AnonymousObserver<TWindowClosing>(                        Stubs<TWindowClosing>.Ignore,                        ex =>                        {                            lock (gate)                            {                                window.OnError(ex);                                observer.OnError(ex);                            }                        },                        () =>                        {                            lock (gate)                            {                                window.OnCompleted();                                window = new Subject<TSource>();                                observer.OnNext(window.AddRef(r));                            }                            l.Wait(createWindowClose);                        }));                };                l.Wait(createWindowClose);                return r;            });#endif        }        public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector)        {            return windowOpenings.GroupJoin(source, windowClosingSelector, _ => Observable.Empty<Unit>(), (_, window) => window);        }        public virtual IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries)        {#if !NO_PERF            return new Window<TSource, TWindowBoundary>(source, windowBoundaries);#else            return new AnonymousObservable<IObservable<TSource>>(observer =>            {                var window = new Subject<TSource>();                var gate = new object();                var d = new CompositeDisposable(2);                var r = new RefCountDisposable(d);                observer.OnNext(window.AddRef(r));                d.Add(source.SubscribeSafe(new AnonymousObserver<TSource>(                    x =>                    {                        lock (gate)                        {                            window.OnNext(x);                        }                    },                    ex =>                    {                        lock (gate)                        {                            window.OnError(ex);                            observer.OnError(ex);                        }                    },                    () =>                    {                        lock (gate)                        {                            window.OnCompleted();                            observer.OnCompleted();                        }                    }                )));                d.Add(windowBoundaries.SubscribeSafe(new AnonymousObserver<TWindowBoundary>(                    w =>                    {                        lock (gate)                        {                            window.OnCompleted();                            window = new Subject<TSource>();                            observer.OnNext(window.AddRef(r));                        }                    },                    ex =>                    {                        lock (gate)                        {                            window.OnError(ex);                            observer.OnError(ex);                        }                    },                    () =>                    {                        lock (gate)                        {                            window.OnCompleted();                            observer.OnCompleted();                        }                    }                )));                return r;            });#endif        }        #endregion        #region + WithLatestFrom +        public virtual IObservable<TResult> WithLatestFrom<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)        {            return new WithLatestFrom<TFirst, TSecond, TResult>(first, second, resultSelector);        }        #endregion        #region + Zip +        public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)        {#if !NO_PERF            return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);#else            return new AnonymousObservable<TResult>(observer =>            {                var queueLeft = new Queue<TFirst>();                var queueRight = new Queue<TSecond>();                var leftDone = false;                var rightDone = false;                var leftSubscription = new SingleAssignmentDisposable();                var rightSubscription = new SingleAssignmentDisposable();                var gate = new object();                leftSubscription.Disposable = first.Synchronize(gate).Subscribe(                    l =>                    {                        if (queueRight.Count > 0)                        {                            var r = queueRight.Dequeue();                            var res = default(TResult);                            try                            {                                res = resultSelector(l, r);                            }                            catch (Exception ex)                            {                                observer.OnError(ex);                                return;                            }                            observer.OnNext(res);                        }                        else                        {                            if (rightDone)                            {                                observer.OnCompleted();                                return;                            }                            queueLeft.Enqueue(l);                        }                    },                    observer.OnError,                    () =>                    {                        leftDone = true;                        if (rightDone)                        {                            observer.OnCompleted();                            return;                        }                    }                );                rightSubscription.Disposable = second.Synchronize(gate).Subscribe(                    r =>                    {                        if (queueLeft.Count > 0)                        {                            var l = queueLeft.Dequeue();                            var res = default(TResult);                            try                            {                                res = resultSelector(l, r);                            }                            catch (Exception ex)                            {                                observer.OnError(ex);                                return;                            }                            observer.OnNext(res);                        }                        else                        {                            if (leftDone)                            {                                observer.OnCompleted();                                return;                            }                            queueRight.Enqueue(r);                        }                    },                    observer.OnError,                    () =>                    {                        rightDone = true;                        if (leftDone)                        {                            observer.OnCompleted();                            return;                        }                    }                );                return new CompositeDisposable(leftSubscription, rightSubscription, Disposable.Create(() => { queueLeft.Clear(); queueRight.Clear(); }));            });#endif        }        public virtual IObservable<TResult> Zip<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)        {            return Zip_<TSource>(sources).Select(resultSelector);        }        public virtual IObservable<IList<TSource>> Zip<TSource>(IEnumerable<IObservable<TSource>> sources)        {            return Zip_<TSource>(sources);        }        public virtual IObservable<IList<TSource>> Zip<TSource>(params IObservable<TSource>[] sources)        {            return Zip_<TSource>(sources);        }        private static IObservable<IList<TSource>> Zip_<TSource>(IEnumerable<IObservable<TSource>> sources)        {#if !NO_PERF            return new Zip<TSource>(sources);#else            return new AnonymousObservable<IList<TSource>>(observer =>            {                var srcs = sources.ToArray();                var N = srcs.Length;                var queues = new Queue<TSource>[N];                for (int i = 0; i < N; i++)                    queues[i] = new Queue<TSource>();                var isDone = new bool[N];                var next = new Action<int>(i =>                {                    if (queues.All(q => q.Count > 0))                    {                        var res = queues.Select(q => q.Dequeue()).ToList();                        observer.OnNext(res);                    }                    else if (isDone.Where((x, j) => j != i).All(Stubs<bool>.I))                    {                        observer.OnCompleted();                        return;                    }                });                var done = new Action<int>(i =>                {                    isDone[i] = true;                    if (isDone.All(Stubs<bool>.I))                    {                        observer.OnCompleted();                        return;                    }                });                var subscriptions = new SingleAssignmentDisposable[N];                var gate = new object();                for (int i = 0; i < N; i++)                {                    var j = i;                    subscriptions[j] = new SingleAssignmentDisposable                    {                        Disposable = srcs[j].Synchronize(gate).Subscribe(                            x =>                            {                                queues[j].Enqueue(x);                                next(j);                            },                            observer.OnError,                            () =>                            {                                done(j);                            }                        )                    };                }                return new CompositeDisposable(subscriptions) { Disposable.Create(() => { foreach (var q in queues) q.Clear(); }) };            });#endif        }#if !NO_PERF        /* The following code is generated by a tool checked in to $/.../Source/Tools/CodeGenerators. */        #region Zip auto-generated code (6/10/2012 8:15:28 PM)        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, Func<TSource1, TSource2, TSource3, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TResult>(source1, source2, source3, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, Func<TSource1, TSource2, TSource3, TSource4, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TResult>(source1, source2, source3, source4, resultSelector);        }#if !NO_LARGEARITY        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TResult>(source1, source2, source3, source4, source5, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TResult>(source1, source2, source3, source4, source5, source6, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TResult>(source1, source2, source3, source4, source5, source6, source7, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, resultSelector);        }        public virtual IObservable<TResult> Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(IObservable<TSource1> source1, IObservable<TSource2> source2, IObservable<TSource3> source3, IObservable<TSource4> source4, IObservable<TSource5> source5, IObservable<TSource6> source6, IObservable<TSource7> source7, IObservable<TSource8> source8, IObservable<TSource9> source9, IObservable<TSource10> source10, IObservable<TSource11> source11, IObservable<TSource12> source12, IObservable<TSource13> source13, IObservable<TSource14> source14, IObservable<TSource15> source15, IObservable<TSource16> source16, Func<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult> resultSelector)        {            return new Zip<TSource1, TSource2, TSource3, TSource4, TSource5, TSource6, TSource7, TSource8, TSource9, TSource10, TSource11, TSource12, TSource13, TSource14, TSource15, TSource16, TResult>(source1, source2, source3, source4, source5, source6, source7, source8, source9, source10, source11, source12, source13, source14, source15, source16, resultSelector);        }#endif        #endregion#endif        public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)        {#if !NO_PERF            return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);#else            return new AnonymousObservable<TResult>(observer =>            {                var rightEnumerator = second.GetEnumerator();                var leftSubscription = first.Subscribe(left =>                    {                        var hasNext = false;                        try                        {                            hasNext = rightEnumerator.MoveNext();                        }                        catch (Exception ex)                        {                            observer.OnError(ex);                            return;                        }                        if (hasNext)                        {                            var right = default(TSecond);                            try                            {                                right = rightEnumerator.Current;                            }                            catch (Exception ex)                            {                                observer.OnError(ex);                                return;                            }                            TResult result;                            try                            {                                result = resultSelector(left, right);                            }                            catch (Exception ex)                            {                                observer.OnError(ex);                                return;                            }                            observer.OnNext(result);                        }                        else                        {                            observer.OnCompleted();                        }                    },                    observer.OnError,                    observer.OnCompleted                );                return new CompositeDisposable(leftSubscription, rightEnumerator);            });#endif        }        #endregion        #region |> Helpers <|#if NO_PERF        private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)        {            return new AnonymousObservable<TResult>(observer =>            {                var leftSubscription = new SingleAssignmentDisposable();                var rightSubscription = new SingleAssignmentDisposable();                var combiner = combinerSelector(observer, leftSubscription, rightSubscription);                var gate = new object();                leftSubscription.Disposable = leftSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateLeft(x)).Synchronize(gate).Subscribe(combiner);                rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);                return new CompositeDisposable(leftSubscription, rightSubscription);            });        }#endif        #endregion    }}
 |