// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.ComponentModel; using System.Reactive; using System.Reactive.Disposables; namespace System { /// /// Provides a set of static methods for subscribing delegates to observables. /// public static class ObservableExtensions { /// /// Subscribes to the observable sequence without specifying any handlers. /// This method can be used to evaluate the observable sequence for its side-effects only. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// IDisposable object used to unsubscribe from the observable sequence. /// is null. public static IDisposable Subscribe(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(Stubs.Ignore, Stubs.Throw, Stubs.Nop)); } /// /// Subscribes an element handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or is null. public static IDisposable Subscribe(this IObservable source, Action onNext) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, Stubs.Throw, Stubs.Nop)); } /// /// Subscribes an element handler and an exception handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or or is null. public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, onError, Stubs.Nop)); } /// /// Subscribes an element handler and a completion handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or or is null. public static IDisposable Subscribe(this IObservable source, Action onNext, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, Stubs.Throw, onCompleted)); } /// /// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or or or is null. public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, onError, onCompleted)); } /// /// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the Subscribe method to the observer's OnError channel. /// This method is typically used when writing query operators. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Observer that will be passed to the observable sequence, and that will be used for exception propagation. /// IDisposable object used to unsubscribe from the observable sequence. /// or is null. [EditorBrowsable(EditorBrowsableState.Advanced)] public static IDisposable SubscribeSafe(this IObservable source, IObserver observer) { if (source == null) throw new ArgumentNullException("source"); if (observer == null) throw new ArgumentNullException("observer"); // // The following types are white-listed and should not exhibit exceptional behavior // for regular operation circumstances. // if (source is ObservableBase) return source.Subscribe(observer); #if !NO_PERF var producer = source as Producer; if (producer != null) return producer.SubscribeRaw(observer, false); #endif var d = Disposable.Empty; try { d = source.Subscribe(observer); } catch (Exception exception) { // // The effect of redirecting the exception to the OnError channel is automatic // clean-up of query operator state for a large number of cases. For example, // consider a binary and temporal query operator with the following Subscribe // behavior (implemented using the Producer pattern with a Run method): // // public IDisposable Run(...) // { // var tm = _scheduler.Schedule(_due, Tick); // // var df = _fst.SubscribeSafe(new FstObserver(this, ...)); // var ds = _snd.SubscribeSafe(new SndObserver(this, ...)); // <-- fails // // return new CompositeDisposable(tm, df, ds); // } // // If the second subscription fails, we're not leaving the first subscription // or the scheduled job hanging around. Instead, the OnError propagation to // the SndObserver should take care of a Dispose call to the observer's parent // object. The handshake between Producer and Sink objects will ultimately // cause disposal of the CompositeDisposable that's returned from the method. // observer.OnError(exception); } return d; } } }