| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.ComponentModel;
- using System.Reactive;
- using System.Reactive.Disposables;
- namespace System
- {
- /// <summary>
- /// Provides a set of static methods for subscribing delegates to observables.
- /// </summary>
- public static class ObservableExtensions
- {
- /// <summary>
- /// Subscribes to the observable sequence without specifying any handlers.
- /// This method can be used to evaluate the observable sequence for its side-effects only.
- /// </summary>
- /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Observable sequence to subscribe to.</param>
- /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
- public static IDisposable Subscribe<T>(this IObservable<T> source)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- //
- // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
- //
- return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(Stubs<T>.Ignore, Stubs.Throw, Stubs.Nop));
- }
- /// <summary>
- /// Subscribes an element handler to an observable sequence.
- /// </summary>
- /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Observable sequence to subscribe to.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- if (onNext == null)
- throw new ArgumentNullException("onNext");
- //
- // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
- //
- return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, Stubs.Throw, Stubs.Nop));
- }
- /// <summary>
- /// Subscribes an element handler and an exception handler to an observable sequence.
- /// </summary>
- /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Observable sequence to subscribe to.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
- /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> is null.</exception>
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- if (onNext == null)
- throw new ArgumentNullException("onNext");
- if (onError == null)
- throw new ArgumentNullException("onError");
- //
- // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
- //
- return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, onError, Stubs.Nop));
- }
- /// <summary>
- /// Subscribes an element handler and a completion handler to an observable sequence.
- /// </summary>
- /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Observable sequence to subscribe to.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
- /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- if (onNext == null)
- throw new ArgumentNullException("onNext");
- if (onCompleted == null)
- throw new ArgumentNullException("onCompleted");
- //
- // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
- //
- return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, Stubs.Throw, onCompleted));
- }
- /// <summary>
- /// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence.
- /// </summary>
- /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Observable sequence to subscribe to.</param>
- /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
- /// <param name="onError">Action to invoke upon exceptional termination of the observable sequence.</param>
- /// <param name="onCompleted">Action to invoke upon graceful termination of the observable sequence.</param>
- /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> or <paramref name="onError"/> or <paramref name="onCompleted"/> is null.</exception>
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- if (onNext == null)
- throw new ArgumentNullException("onNext");
- if (onError == null)
- throw new ArgumentNullException("onError");
- if (onCompleted == null)
- throw new ArgumentNullException("onCompleted");
- //
- // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
- //
- return source.Subscribe/*Unsafe*/(new AnonymousObserver<T>(onNext, onError, onCompleted));
- }
- /// <summary>
- /// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the Subscribe method to the observer's OnError channel.
- /// This method is typically used when writing query operators.
- /// </summary>
- /// <typeparam name="T">The type of the elements in the source sequence.</typeparam>
- /// <param name="source">Observable sequence to subscribe to.</param>
- /// <param name="observer">Observer that will be passed to the observable sequence, and that will be used for exception propagation.</param>
- /// <returns>IDisposable object used to unsubscribe from the observable sequence.</returns>
- /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="observer"/> is null.</exception>
- [EditorBrowsable(EditorBrowsableState.Advanced)]
- public static IDisposable SubscribeSafe<T>(this IObservable<T> source, IObserver<T> observer)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- if (observer == null)
- throw new ArgumentNullException("observer");
- //
- // The following types are white-listed and should not exhibit exceptional behavior
- // for regular operation circumstances.
- //
- if (source is ObservableBase<T>)
- return source.Subscribe(observer);
- #if !NO_PERF
- var producer = source as Producer<T>;
- if (producer != null)
- return producer.SubscribeRaw(observer, false);
- #endif
- var d = Disposable.Empty;
- try
- {
- d = source.Subscribe(observer);
- }
- catch (Exception exception)
- {
- //
- // The effect of redirecting the exception to the OnError channel is automatic
- // clean-up of query operator state for a large number of cases. For example,
- // consider a binary and temporal query operator with the following Subscribe
- // behavior (implemented using the Producer pattern with a Run method):
- //
- // public IDisposable Run(...)
- // {
- // var tm = _scheduler.Schedule(_due, Tick);
- //
- // var df = _fst.SubscribeSafe(new FstObserver(this, ...));
- // var ds = _snd.SubscribeSafe(new SndObserver(this, ...)); // <-- fails
- //
- // return new CompositeDisposable(tm, df, ds);
- // }
- //
- // If the second subscription fails, we're not leaving the first subscription
- // or the scheduled job hanging around. Instead, the OnError propagation to
- // the SndObserver should take care of a Dispose call to the observer's parent
- // object. The handshake between Producer and Sink objects will ultimately
- // cause disposal of the CompositeDisposable that's returned from the method.
- //
- observer.OnError(exception);
- }
- return d;
- }
- }
- }
|