// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.ComponentModel;
using System.Reactive;
using System.Reactive.Disposables;
using System.Threading;
namespace System
{
///
/// Provides a set of static methods for subscribing delegates to observables.
///
public static class ObservableExtensions
{
#region Subscribe delegate-based overloads
///
/// Subscribes to the observable sequence without specifying any handlers.
/// This method can be used to evaluate the observable sequence for its side-effects only.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// IDisposable object used to unsubscribe from the observable sequence.
/// is null.
public static IDisposable Subscribe(this IObservable source)
{
if (source == null)
throw new ArgumentNullException("source");
//
// [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
//
return source.Subscribe/*Unsafe*/(new AnonymousObserver(Stubs.Ignore, Stubs.Throw, Stubs.Nop));
}
///
/// Subscribes an element handler to an observable sequence.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// IDisposable object used to unsubscribe from the observable sequence.
/// or is null.
public static IDisposable Subscribe(this IObservable source, Action onNext)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
//
// [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
//
return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, Stubs.Throw, Stubs.Nop));
}
///
/// Subscribes an element handler and an exception handler to an observable sequence.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// Action to invoke upon exceptional termination of the observable sequence.
/// IDisposable object used to unsubscribe from the observable sequence.
/// or or is null.
public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
//
// [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
//
return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, onError, Stubs.Nop));
}
///
/// Subscribes an element handler and a completion handler to an observable sequence.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// Action to invoke upon graceful termination of the observable sequence.
/// IDisposable object used to unsubscribe from the observable sequence.
/// or or is null.
public static IDisposable Subscribe(this IObservable source, Action onNext, Action onCompleted)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
//
// [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
//
return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, Stubs.Throw, onCompleted));
}
///
/// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// Action to invoke upon exceptional termination of the observable sequence.
/// Action to invoke upon graceful termination of the observable sequence.
/// IDisposable object used to unsubscribe from the observable sequence.
/// or or or is null.
public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError, Action onCompleted)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
//
// [OK] Use of unsafe Subscribe: non-pretentious constructor for an observer; this overload is not to be used internally.
//
return source.Subscribe/*Unsafe*/(new AnonymousObserver(onNext, onError, onCompleted));
}
#endregion
#region Subscribe overloads with CancellationToken
#if !NO_TPL
///
/// Subscribes an observer to an observable sequence, using a CancellationToken to support unsubscription.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Observer to subscribe to the sequence.
/// CancellationToken that can be signaled to unsubscribe from the source sequence.
/// or is null.
public static void Subscribe(this IObservable source, IObserver observer, CancellationToken token)
{
if (source == null)
throw new ArgumentNullException("source");
if (observer == null)
throw new ArgumentNullException("observer");
source.Subscribe_(observer, token);
}
///
/// Subscribes to the observable sequence without specifying any handlers, using a CancellationToken to support unsubscription.
/// This method can be used to evaluate the observable sequence for its side-effects only.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// CancellationToken that can be signaled to unsubscribe from the source sequence.
/// is null.
public static void Subscribe(this IObservable source, CancellationToken token)
{
if (source == null)
throw new ArgumentNullException("source");
source.Subscribe_(new AnonymousObserver(Stubs.Ignore, Stubs.Throw, Stubs.Nop), token);
}
///
/// Subscribes an element handler to an observable sequence, using a CancellationToken to support unsubscription.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// CancellationToken that can be signaled to unsubscribe from the source sequence.
/// or is null.
public static void Subscribe(this IObservable source, Action onNext, CancellationToken token)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
source.Subscribe_(new AnonymousObserver(onNext, Stubs.Throw, Stubs.Nop), token);
}
///
/// Subscribes an element handler and an exception handler to an observable sequence, using a CancellationToken to support unsubscription.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// Action to invoke upon exceptional termination of the observable sequence.
/// CancellationToken that can be signaled to unsubscribe from the source sequence.
/// or or is null.
public static void Subscribe(this IObservable source, Action onNext, Action onError, CancellationToken token)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
source.Subscribe_(new AnonymousObserver(onNext, onError, Stubs.Nop), token);
}
///
/// Subscribes an element handler and a completion handler to an observable sequence, using a CancellationToken to support unsubscription.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// Action to invoke upon graceful termination of the observable sequence.
/// CancellationToken that can be signaled to unsubscribe from the source sequence.
/// or or is null.
public static void Subscribe(this IObservable source, Action onNext, Action onCompleted, CancellationToken token)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
source.Subscribe_(new AnonymousObserver(onNext, Stubs.Throw, onCompleted), token);
}
///
/// Subscribes an element handler, an exception handler, and a completion handler to an observable sequence, using a CancellationToken to support unsubscription.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Action to invoke for each element in the observable sequence.
/// Action to invoke upon exceptional termination of the observable sequence.
/// Action to invoke upon graceful termination of the observable sequence.
/// CancellationToken that can be signaled to unsubscribe from the source sequence.
/// or or or is null.
public static void Subscribe(this IObservable source, Action onNext, Action onError, Action onCompleted, CancellationToken token)
{
if (source == null)
throw new ArgumentNullException("source");
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
source.Subscribe_(new AnonymousObserver(onNext, onError, onCompleted), token);
}
private static void Subscribe_(this IObservable source, IObserver observer, CancellationToken token)
{
if (token.CanBeCanceled)
{
if (!token.IsCancellationRequested)
{
var r = new SingleAssignmentDisposable();
//
// [OK] Use of unsafe Subscribe: exception during Subscribe doesn't orphan CancellationTokenRegistration.
//
var d = source.Subscribe/*Unsafe*/(
observer.OnNext,
ex =>
{
using (r)
observer.OnError(ex);
},
() =>
{
using (r)
observer.OnCompleted();
}
);
r.Disposable = token.Register(d.Dispose);
}
}
else
{
source.Subscribe(observer);
}
}
#endif
#endregion
#region SubscribeSafe
///
/// Subscribes to the specified source, re-routing synchronous exceptions during invocation of the Subscribe method to the observer's OnError channel.
/// This method is typically used when writing query operators.
///
/// The type of the elements in the source sequence.
/// Observable sequence to subscribe to.
/// Observer that will be passed to the observable sequence, and that will be used for exception propagation.
/// IDisposable object used to unsubscribe from the observable sequence.
/// or is null.
[EditorBrowsable(EditorBrowsableState.Advanced)]
public static IDisposable SubscribeSafe(this IObservable source, IObserver observer)
{
if (source == null)
throw new ArgumentNullException("source");
if (observer == null)
throw new ArgumentNullException("observer");
//
// The following types are white-listed and should not exhibit exceptional behavior
// for regular operation circumstances.
//
if (source is ObservableBase)
return source.Subscribe(observer);
#if !NO_PERF
var producer = source as IProducer;
if (producer != null)
return producer.SubscribeRaw(observer, false);
#endif
var d = Disposable.Empty;
try
{
d = source.Subscribe(observer);
}
catch (Exception exception)
{
//
// The effect of redirecting the exception to the OnError channel is automatic
// clean-up of query operator state for a large number of cases. For example,
// consider a binary and temporal query operator with the following Subscribe
// behavior (implemented using the Producer pattern with a Run method):
//
// public IDisposable Run(...)
// {
// var tm = _scheduler.Schedule(_due, Tick);
//
// var df = _fst.SubscribeSafe(new FstObserver(this, ...));
// var ds = _snd.SubscribeSafe(new SndObserver(this, ...)); // <-- fails
//
// return new CompositeDisposable(tm, df, ds);
// }
//
// If the second subscription fails, we're not leaving the first subscription
// or the scheduled job hanging around. Instead, the OnError propagation to
// the SndObserver should take care of a Dispose call to the observer's parent
// object. The handshake between Producer and Sink objects will ultimately
// cause disposal of the CompositeDisposable that's returned from the method.
//
observer.OnError(exception);
}
return d;
}
#endregion
}
}