| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Reactive.Concurrency;using System.Threading;namespace System.Reactive.Linq{    public static partial class Observable    {        #region + ObserveOn +        /// <summary>        /// Wraps the source sequence in order to run its observer callbacks on the specified scheduler.        /// </summary>        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>        /// <param name="source">Source sequence.</param>        /// <param name="scheduler">Scheduler to notify observers on.</param>        /// <returns>The source sequence whose observations happen on the specified scheduler.</returns>        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>        /// <remarks>        /// This only invokes observer callbacks on a scheduler. In case the subscription and/or unsubscription actions have side-effects        /// that require to be run on a scheduler, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, IScheduler)"/>.        /// </remarks>        public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)        {            if (source == null)                throw new ArgumentNullException("source");            if (scheduler == null)                throw new ArgumentNullException("scheduler");            return s_impl.ObserveOn<TSource>(source, scheduler);        }#if !NO_SYNCCTX        /// <summary>        /// Wraps the source sequence in order to run its observer callbacks on the specified synchronization context.        /// </summary>        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>        /// <param name="source">Source sequence.</param>        /// <param name="context">Synchronization context to notify observers on.</param>        /// <returns>The source sequence whose observations happen on the specified synchronization context.</returns>        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>        /// <remarks>        /// This only invokes observer callbacks on a synchronization context. In case the subscription and/or unsubscription actions have side-effects        /// that require to be run on a synchronization context, use <see cref="Observable.SubscribeOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.        /// </remarks>        public static IObservable<TSource> ObserveOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)        {            if (source == null)                throw new ArgumentNullException("source");            if (context == null)                throw new ArgumentNullException("context");            return s_impl.ObserveOn<TSource>(source, context);        }#endif        #endregion        #region + SubscribeOn +        /// <summary>        /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler. This operation is not commonly used;        /// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.        /// </summary>        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>        /// <param name="source">Source sequence.</param>        /// <param name="scheduler">Scheduler to perform subscription and unsubscription actions on.</param>        /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.</returns>        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="scheduler"/> is null.</exception>        /// <remarks>        /// This only performs the side-effects of subscription and unsubscription on the specified scheduler. In order to invoke observer        /// callbacks on a scheduler, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, IScheduler)"/>.        /// </remarks>        public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, IScheduler scheduler)        {            if (source == null)                throw new ArgumentNullException("source");            if (scheduler == null)                throw new ArgumentNullException("scheduler");            return s_impl.SubscribeOn<TSource>(source, scheduler);        }#if !NO_SYNCCTX        /// <summary>        /// Wraps the source sequence in order to run its subscription and unsubscription logic on the specified synchronization context. This operation is not commonly used;        /// see the remarks section for more information on the distinction between SubscribeOn and ObserveOn.        /// </summary>        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>        /// <param name="source">Source sequence.</param>        /// <param name="context">Synchronization context to perform subscription and unsubscription actions on.</param>        /// <returns>The source sequence whose subscriptions and unsubscriptions happen on the specified synchronization context.</returns>        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="context"/> is null.</exception>        /// <remarks>        /// This only performs the side-effects of subscription and unsubscription on the specified synchronization context. In order to invoke observer        /// callbacks on a synchronization context, use <see cref="Observable.ObserveOn{TSource}(IObservable{TSource}, SynchronizationContext)"/>.        /// </remarks>        public static IObservable<TSource> SubscribeOn<TSource>(this IObservable<TSource> source, SynchronizationContext context)        {            if (source == null)                throw new ArgumentNullException("source");            if (context == null)                throw new ArgumentNullException("context");            return s_impl.SubscribeOn<TSource>(source, context);        }#endif        #endregion        #region + Synchronize +        /// <summary>        /// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently.        /// This overload is useful to "fix" an observable sequence that exhibits concurrent callbacks on individual observers, which is invalid behavior for the query processor.        /// </summary>        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>        /// <param name="source">Source sequence.</param>        /// <returns>The source sequence whose outgoing calls to observers are synchronized.</returns>        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>        /// <remarks>        /// It's invalid behavior - according to the observer grammar - for a sequence to exhibit concurrent callbacks on a given observer.        /// This operator can be used to "fix" a source that doesn't conform to this rule.        /// </remarks>        public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source)        {            if (source == null)                throw new ArgumentNullException("source");            return s_impl.Synchronize<TSource>(source);        }        /// <summary>        /// Synchronizes the observable sequence such that observer notifications cannot be delivered concurrently, using the specified gate object.        /// This overload is useful when writing n-ary query operators, in order to prevent concurrent callbacks from different sources by synchronizing on a common gate object.        /// </summary>        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>        /// <param name="source">Source sequence.</param>        /// <param name="gate">Gate object to synchronize each observer call on.</param>        /// <returns>The source sequence whose outgoing calls to observers are synchronized on the given gate object.</returns>        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="gate"/> is null.</exception>        public static IObservable<TSource> Synchronize<TSource>(this IObservable<TSource> source, object gate)        {            if (source == null)                throw new ArgumentNullException("source");            if (gate == null)                throw new ArgumentNullException("gate");            return s_impl.Synchronize<TSource>(source, gate);        }        #endregion    }}
 |