// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive { /// /// Abstract base class for implementations of the IObservable<T> interface. /// /// /// If you don't need a named type to create an observable sequence (i.e. you rather need /// an instance rather than a reusable type), use the Observable.Create method to create /// an observable sequence with specified subscription behavior. /// /// The type of the elements in the sequence. public abstract class ObservableBase : IObservable { /// /// Subscribes the given observer to the observable sequence. /// /// Observer that will receive notifications from the observable sequence. /// Disposable object representing an observer's subscription to the observable sequence. /// is null. public IDisposable Subscribe(IObserver observer) { if (observer == null) throw new ArgumentNullException("observer"); var autoDetachObserver = new AutoDetachObserver(observer); if (CurrentThreadScheduler.IsScheduleRequired) { // // Notice we don't protect this piece of code using an exception handler to // redirect errors to the OnError channel. This call to Schedule will run the // trampoline, so we'd be catching all exceptions, including those from user // callbacks that happen to run there. For example, consider: // // Observable.Return(42, Scheduler.CurrentThread) // .Subscribe(x => { throw new Exception(); }); // // Here, the OnNext(42) call would be scheduled on the trampoline, so when we // return from the scheduled Subscribe call, the CurrentThreadScheduler moves // on to invoking this work item. Too much of protection here would cause the // exception thrown in OnNext to circle back to OnError, which looks like the // sequence can't make up its mind. // CurrentThreadScheduler.Instance.Schedule(autoDetachObserver, ScheduledSubscribe); } else { try { autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver); } catch (Exception exception) { // // This can happen when there's a synchronous callback to OnError in the // implementation of SubscribeCore, which also throws. So, we're seeing // an exception being thrown from a handler. // // For compat with v1.x, we rethrow the exception in this case, keeping // in mind this should be rare but if it happens, something's totally // screwed up. // if (!autoDetachObserver.Fail(exception)) throw; } } return autoDetachObserver; } private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver autoDetachObserver) { try { autoDetachObserver.Disposable = SubscribeCore(autoDetachObserver); } catch (Exception exception) { // // This can happen when there's a synchronous callback to OnError in the // implementation of SubscribeCore, which also throws. So, we're seeing // an exception being thrown from a handler. // // For compat with v1.x, we rethrow the exception in this case, keeping // in mind this should be rare but if it happens, something's totally // screwed up. // if (!autoDetachObserver.Fail(exception)) throw; } return Disposable.Empty; } /// /// Implement this method with the core subscription logic for the observable sequence. /// /// Observer to send notifications to. /// Disposable object representing an observer's subscription to the observable sequence. protected abstract IDisposable SubscribeCore(IObserver observer); } }