// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.Collections.Generic; using System.Reactive.Concurrency; namespace System.Reactive.Linq { /// /// Provides a set of static methods for writing in-memory queries over observable sequences. /// public static class ObservableEx { #pragma warning disable IDE1006 // Naming Styles: 3rd party code is known to reflect for this specific field name private static IQueryLanguageEx s_impl = QueryServices.GetQueryImpl(new QueryLanguageEx()); #pragma warning restore IDE1006 // Naming Styles #region Create /// /// Subscribes to each observable sequence returned by the iteratorMethod in sequence and returns the observable sequence of values sent to the observer given to the iteratorMethod. /// /// The type of the elements in the produced sequence. /// Iterator method that produces elements in the resulting sequence by calling the given observer. /// An observable sequence obtained by running the iterator and returning the elements that were sent to the observer. /// is null. [Experimental] public static IObservable Create(Func, IEnumerable>> iteratorMethod) { if (iteratorMethod == null) { throw new ArgumentNullException(nameof(iteratorMethod)); } return s_impl.Create(iteratorMethod); } /// /// Subscribes to each observable sequence returned by the iteratorMethod in sequence and produces a Unit value on the resulting sequence for each step of the iteration. /// /// Iterator method that drives the resulting observable sequence. /// An observable sequence obtained by running the iterator and returning Unit values for each iteration step. /// is null. [Experimental] public static IObservable Create(Func>> iteratorMethod) { if (iteratorMethod == null) { throw new ArgumentNullException(nameof(iteratorMethod)); } return s_impl.Create(iteratorMethod); } #endregion #region Expand /// /// Expands an observable sequence by recursively invoking selector, using the specified scheduler to enumerate the queue of obtained sequences. /// /// The type of the elements in the source sequence and each of the recursively expanded sources obtained by running the selector function. /// Source sequence with the initial elements. /// Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again. /// Scheduler on which to perform the expansion by enumerating the internal queue of obtained sequences. /// An observable sequence containing all the elements produced by the recursive expansion. /// or or is null. [Experimental] public static IObservable Expand(this IObservable source, Func> selector, IScheduler scheduler) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (selector == null) { throw new ArgumentNullException(nameof(selector)); } if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return s_impl.Expand(source, selector, scheduler); } /// /// Expands an observable sequence by recursively invoking selector. /// /// The type of the elements in the source sequence and each of the recursively expanded sources obtained by running the selector function. /// Source sequence with the initial elements. /// Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again. /// An observable sequence containing all the elements produced by the recursive expansion. /// or is null. [Experimental] public static IObservable Expand(this IObservable source, Func> selector) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (selector == null) { throw new ArgumentNullException(nameof(selector)); } return s_impl.Expand(source, selector); } #endregion #region ForkJoin /// /// Runs two observable sequences in parallel and combines their last elements. /// /// The type of the elements in the first source sequence. /// The type of the elements in the second source sequence. /// The type of the elements in the result sequence, returned by the selector function. /// First observable sequence. /// Second observable sequence. /// Result selector function to invoke with the last elements of both sequences. /// An observable sequence with the result of calling the selector function with the last elements of both input sequences. /// or or is null. [Experimental] public static IObservable ForkJoin(this IObservable first, IObservable second, Func resultSelector) { if (first == null) { throw new ArgumentNullException(nameof(first)); } if (second == null) { throw new ArgumentNullException(nameof(second)); } if (resultSelector == null) { throw new ArgumentNullException(nameof(resultSelector)); } return s_impl.ForkJoin(first, second, resultSelector); } /// /// Runs all specified observable sequences in parallel and collects their last elements. /// /// The type of the elements in the source sequences. /// Observable sequence to collect the last elements for. /// An observable sequence with an array collecting the last elements of all the input sequences. /// is null. [Experimental] public static IObservable ForkJoin(params IObservable[] sources) { if (sources == null) { throw new ArgumentNullException(nameof(sources)); } return s_impl.ForkJoin(sources); } /// /// Runs all observable sequences in the enumerable sources sequence in parallel and collect their last elements. /// /// The type of the elements in the source sequences. /// Observable sequence to collect the last elements for. /// An observable sequence with an array collecting the last elements of all the input sequences. /// is null. [Experimental] public static IObservable ForkJoin(this IEnumerable> sources) { if (sources == null) { throw new ArgumentNullException(nameof(sources)); } return s_impl.ForkJoin(sources); } #endregion #region Let /// /// Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions. /// This operator allows for a fluent style of writing queries that use the same sequence multiple times. /// /// The type of the elements in the source sequence. /// The type of the elements in the result sequence. /// Source sequence that will be shared in the selector function. /// Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence. /// An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function. /// or is null. [Experimental] public static IObservable Let(this IObservable source, Func, IObservable> selector) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (selector == null) { throw new ArgumentNullException(nameof(selector)); } return s_impl.Let(source, selector); } #endregion #region ManySelect /// /// Comonadic bind operator. /// [Experimental] public static IObservable ManySelect(this IObservable source, Func, TResult> selector, IScheduler scheduler) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (selector == null) { throw new ArgumentNullException(nameof(selector)); } if (scheduler == null) { throw new ArgumentNullException(nameof(scheduler)); } return s_impl.ManySelect(source, selector, scheduler); } /// /// Comonadic bind operator. /// [Experimental] public static IObservable ManySelect(this IObservable source, Func, TResult> selector) { if (source == null) { throw new ArgumentNullException(nameof(source)); } if (selector == null) { throw new ArgumentNullException(nameof(selector)); } return s_impl.ManySelect(source, selector); } #endregion #region ToListObservable /// /// Immediately subscribes to source and retains the elements in the observable sequence. /// /// The type of the elements in the source sequence. /// Source sequence. /// Object that's both an observable sequence and a list which can be used to access the source sequence's elements. /// is null. [Experimental] public static ListObservable ToListObservable(this IObservable source) { if (source == null) { throw new ArgumentNullException(nameof(source)); } return s_impl.ToListObservable(source); } #endregion } }