| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Collections.Generic;using System.Reactive.Concurrency;using System.Reactive.Disposables;namespace System.Reactive.Linq{#if !NO_PERF    using ObservableImpl;#endif    internal partial class QueryLanguage    {        #region + Subscribe +        public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer)        {            return Subscribe_<TSource>(source, observer, SchedulerDefaults.Iteration);        }        public virtual IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)        {            return Subscribe_<TSource>(source, observer, scheduler);        }        private static IDisposable Subscribe_<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler)        {#if !NO_PERF            //            // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.            //            return new ToObservable<TSource>(source, scheduler).Subscribe/*Unsafe*/(observer);#else            var e = source.GetEnumerator();            var flag = new BooleanDisposable();            scheduler.Schedule(self =>            {                var hasNext = false;                var ex = default(Exception);                var current = default(TSource);                if (flag.IsDisposed)                {                    e.Dispose();                    return;                }                try                {                    hasNext = e.MoveNext();                    if (hasNext)                        current = e.Current;                }                catch (Exception exception)                {                    ex = exception;                }                if (!hasNext || ex != null)                {                    e.Dispose();                }                if (ex != null)                {                    observer.OnError(ex);                    return;                }                if (!hasNext)                {                    observer.OnCompleted();                    return;                }                observer.OnNext(current);                self();            });            return flag;#endif        }        #endregion        #region + ToEnumerable +        public virtual IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source)        {            return new AnonymousEnumerable<TSource>(() => source.GetEnumerator());        }        #endregion        #region ToEvent        public virtual IEventSource<Unit> ToEvent(IObservable<Unit> source)        {            return new EventSource<Unit>(source, (h, _) => h(Unit.Default));        }        public virtual IEventSource<TSource> ToEvent<TSource>(IObservable<TSource> source)        {            return new EventSource<TSource>(source, (h, value) => h(value));        }        #endregion        #region ToEventPattern        public virtual IEventPatternSource<TEventArgs> ToEventPattern<TEventArgs>(IObservable<EventPattern<TEventArgs>> source)#if !NO_EVENTARGS_CONSTRAINT            where TEventArgs : EventArgs#endif        {            return new EventPatternSource<TEventArgs>(#if !NO_VARIANCE                source,#else                source.Select(x => (EventPattern<object, TEventArgs>)x),#endif                (h, evt) => h(evt.Sender, evt.EventArgs)            );        }        #endregion        #region + ToObservable +        public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source)        {#if !NO_PERF            return new ToObservable<TSource>(source, SchedulerDefaults.Iteration);#else            return ToObservable_(source, SchedulerDefaults.Iteration);#endif        }        public virtual IObservable<TSource> ToObservable<TSource>(IEnumerable<TSource> source, IScheduler scheduler)        {#if !NO_PERF            return new ToObservable<TSource>(source, scheduler);#else            return ToObservable_(source, scheduler);#endif        }#if NO_PERF        private static IObservable<TSource> ToObservable_<TSource>(IEnumerable<TSource> source, IScheduler scheduler)        {            return new AnonymousObservable<TSource>(observer => source.Subscribe(observer, scheduler));        }#endif        #endregion    }}
 |