// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; using System.Collections.Generic; using System.Reactive.Concurrency; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { class Catch : Producer { private readonly IEnumerable> _sources; public Catch(IEnumerable> sources) { _sources = sources; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(observer, cancel); setSink(sink); return sink.Run(_sources); } class _ : TailRecursiveSink { public _(IObserver observer, IDisposable cancel) : base(observer, cancel) { } protected override IEnumerable> Extract(IObservable source) { var @catch = source as Catch; if (@catch != null) return @catch._sources; return null; } public override void OnNext(TSource value) { base._observer.OnNext(value); } private Exception _lastException; public override void OnError(Exception error) { _lastException = error; _recurse(); } public override void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } protected override void Done() { if (_lastException != null) base._observer.OnError(_lastException); else base._observer.OnCompleted(); base.Dispose(); } } } class Catch : Producer where TException : Exception { private readonly IObservable _source; private readonly Func> _handler; public Catch(IObservable source, Func> handler) { _source = source; _handler = handler; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } class _ : Sink, IObserver { private readonly Catch _parent; public _(Catch parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private SerialDisposable _subscription; public IDisposable Run() { _subscription = new SerialDisposable(); var d1 = new SingleAssignmentDisposable(); _subscription.Disposable = d1; d1.Disposable = _parent._source.SubscribeSafe(this); return _subscription; } public void OnNext(TSource value) { base._observer.OnNext(value); } public void OnError(Exception error) { var e = error as TException; if (e != null) { var result = default(IObservable); try { result = _parent._handler(e); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } var d = new SingleAssignmentDisposable(); _subscription.Disposable = d; d.Disposable = result.SubscribeSafe(new Impl(this)); } else { base._observer.OnError(error); base.Dispose(); } } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } class Impl : IObserver { private readonly _ _parent; public Impl(_ parent) { _parent = parent; } public void OnNext(TSource value) { _parent._observer.OnNext(value); } public void OnError(Exception error) { _parent._observer.OnError(error); _parent.Dispose(); } public void OnCompleted() { _parent._observer.OnCompleted(); _parent.Dispose(); } } } } } #endif