// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System.ComponentModel; using System.Reactive; using System.Reactive.Disposables; using System.Threading; namespace System { /// /// Provides a set of static methods for subscribing delegates to observables. /// public static class ObservableExtensions { #region Subscribe delegate-based overloads /// /// Subscribes to the observable sequence without specifying any handlers. /// This method can be used to evaluate the observable sequence for its side-effects only. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// IDisposable object used to unsubscribe from the observable sequence. /// is null. public static IDisposable Subscribe(this IObservable source) { if (source == null) throw new ArgumentNullException("source"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(Stubs.Ignore, Stubs.Throw, Stubs.Nop)); } /// /// Subscribes an element handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or is null. public static IDisposable Subscribe(this IObservable source, Action onNext) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, Stubs.Throw, Stubs.Nop)); } /// /// Subscribes an element handler and an exception handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or or is null. public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, onError, Stubs.Nop)); } /// /// Subscribes an element handler and a completion handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or or is null. public static IDisposable Subscribe(this IObservable source, Action onNext, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, Stubs.Throw, onCompleted)); } /// /// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// IDisposable object used to unsubscribe from the observable sequence. /// or or or is null. public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError, Action onCompleted) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); // // [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally. // return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, onError, onCompleted)); } #endregion #region Subscribe overloads with CancellationToken #if !NO_TPL /// /// Subscribes an observer to an observable sequence, using a CancellationToken to support unsubscription. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Observer to subscribe to the sequence. /// CancellationToken that can be signaled to unsubscribe from the source sequence. /// or is null. public static void Subscribe(this IObservable source, IObserver observer, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (observer == null) throw new ArgumentNullException("observer"); source.Subscribe_(observer, token); } /// /// Subscribes to the observable sequence without specifying any handlers, using a CancellationToken to support unsubscription. /// This method can be used to evaluate the observable sequence for its side-effects only. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// CancellationToken that can be signaled to unsubscribe from the source sequence. /// is null. public static void Subscribe(this IObservable source, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); source.Subscribe_(new AnonymousObserver(Stubs.Ignore, Stubs.Throw, Stubs.Nop), token); } /// /// Subscribes an element handler to an observable sequence, using a CancellationToken to support unsubscription. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// CancellationToken that can be signaled to unsubscribe from the source sequence. /// or is null. public static void Subscribe(this IObservable source, Action onNext, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); source.Subscribe_(new AnonymousObserver(onNext, Stubs.Throw, Stubs.Nop), token); } /// /// Subscribes an element handler and an exception handler to an observable sequence, using a CancellationToken to support unsubscription. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// CancellationToken that can be signaled to unsubscribe from the source sequence. /// or or is null. public static void Subscribe(this IObservable source, Action onNext, Action onError, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); source.Subscribe_(new AnonymousObserver(onNext, onError, Stubs.Nop), token); } /// /// Subscribes an element handler and a completion handler to an observable sequence, using a CancellationToken to support unsubscription. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// CancellationToken that can be signaled to unsubscribe from the source sequence. /// or or is null. public static void Subscribe(this IObservable source, Action onNext, Action onCompleted, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); source.Subscribe_(new AnonymousObserver(onNext, Stubs.Throw, onCompleted), token); } /// /// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence, using a CancellationToken to support unsubscription. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Action to invoke for each element in the observable sequence. /// Action to invoke upon exceptional termination of the observable sequence. /// Action to invoke upon graceful termination of the observable sequence. /// CancellationToken that can be signaled to unsubscribe from the source sequence. /// or or or is null. public static void Subscribe(this IObservable source, Action onNext, Action onError, Action onCompleted, CancellationToken token) { if (source == null) throw new ArgumentNullException("source"); if (onNext == null) throw new ArgumentNullException("onNext"); if (onError == null) throw new ArgumentNullException("onError"); if (onCompleted == null) throw new ArgumentNullException("onCompleted"); source.Subscribe_(new AnonymousObserver(onNext, onError, onCompleted), token); } private static void Subscribe_(this IObservable source, IObserver observer, CancellationToken token) { if (token.CanBeCanceled) { if (!token.IsCancellationRequested) { var r = new SingleAssignmentDisposable(); // // [OK] Use of unsafe Subscribe: exception during Subscribe doesn't orphan CancellationTokenRegistration. // var d = source.Subscribe/*Unsafe*/( observer.OnNext, ex => { using (r) observer.OnError(ex); }, () => { using (r) observer.OnCompleted(); } ); r.Disposable = token.Register(d.Dispose); } } else { source.Subscribe(observer); } } #endif #endregion #region SubscribeSafe /// /// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the Subscribe method to the observer's OnError channel. /// This method is typically used when writing query operators. /// /// The type of the elements in the source sequence. /// Observable sequence to subscribe to. /// Observer that will be passed to the observable sequence, and that will be used for exception propagation. /// IDisposable object used to unsubscribe from the observable sequence. /// or is null. [EditorBrowsable(EditorBrowsableState.Advanced)] public static IDisposable SubscribeSafe(this IObservable source, IObserver observer) { if (source == null) throw new ArgumentNullException("source"); if (observer == null) throw new ArgumentNullException("observer"); // // The following types are white-listed and should not exhibit exceptional behavior // for regular operation circumstances. // if (source is ObservableBase) return source.Subscribe(observer); #if !NO_PERF var producer = source as IProducer; if (producer != null) return producer.SubscribeRaw(observer, false); #endif var d = Disposable.Empty; try { d = source.Subscribe(observer); } catch (Exception exception) { // // The effect of redirecting the exception to the OnError channel is automatic // clean-up of query operator state for a large number of cases. For example, // consider a binary and temporal query operator with the following Subscribe // behavior (implemented using the Producer pattern with a Run method): // // public IDisposable Run(...) // { // var tm = _scheduler.Schedule(_due, Tick); // // var df = _fst.SubscribeSafe(new FstObserver(this, ...)); // var ds = _snd.SubscribeSafe(new SndObserver(this, ...)); // <-- fails // // return new CompositeDisposable(tm, df, ds); // } // // If the second subscription fails, we're not leaving the first subscription // or the scheduled job hanging around. Instead, the OnError propagation to // the SndObserver should take care of a Dispose call to the observer's parent // object. The handshake between Producer and Sink objects will ultimately // cause disposal of the CompositeDisposable that's returned from the method. // observer.OnError(exception); } return d; } #endregion } }