// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Concurrency; namespace System.Reactive.Linq { using ObservableImpl; internal partial class QueryLanguage { #region + Subscribe + public virtual IDisposable Subscribe(IEnumerable source, IObserver observer) { return Subscribe_(source, observer, SchedulerDefaults.Iteration); } public virtual IDisposable Subscribe(IEnumerable source, IObserver observer, IScheduler scheduler) { return Subscribe_(source, observer, scheduler); } private static IDisposable Subscribe_(IEnumerable source, IObserver observer, IScheduler scheduler) { var longRunning = scheduler.AsLongRunning(); if (longRunning != null) { // // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. // return new ToObservableLongRunning(source, longRunning).Subscribe/*Unsafe*/(observer); } // // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. // return new ToObservableRecursive(source, scheduler).Subscribe/*Unsafe*/(observer); } #endregion #region + ToEnumerable + public virtual IEnumerable ToEnumerable(IObservable source) { return new AnonymousEnumerable(() => source.GetEnumerator()); } #endregion #region ToEvent public virtual IEventSource ToEvent(IObservable source) { return new EventSource(source, (h, _) => h(Unit.Default)); } public virtual IEventSource ToEvent(IObservable source) { return new EventSource(source, (h, value) => h(value)); } #endregion #region ToEventPattern public virtual IEventPatternSource ToEventPattern(IObservable> source) { return new EventPatternSource( source, (h, evt) => h(evt.Sender, evt.EventArgs) ); } #endregion #region + ToObservable + public virtual IObservable ToObservable(IEnumerable source) { return ToObservable_(source, SchedulerDefaults.Iteration); } public virtual IObservable ToObservable(IEnumerable source, IScheduler scheduler) { return ToObservable_(source, scheduler); } private static IObservable ToObservable_(IEnumerable source, IScheduler scheduler) { var longRunning = scheduler.AsLongRunning(); if (longRunning != null) { // // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. // return new ToObservableLongRunning(source, longRunning); } // // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation. // return new ToObservableRecursive(source, scheduler); } #endregion } }