Catch.cs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Collections.Generic;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. class Catch<TSource> : Producer<TSource>
  9. {
  10. private readonly IEnumerable<IObservable<TSource>> _sources;
  11. public Catch(IEnumerable<IObservable<TSource>> sources)
  12. {
  13. _sources = sources;
  14. }
  15. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  16. {
  17. var sink = new _(observer, cancel);
  18. setSink(sink);
  19. return sink.Run(_sources);
  20. }
  21. class _ : TailRecursiveSink<TSource>
  22. {
  23. public _(IObserver<TSource> observer, IDisposable cancel)
  24. : base(observer, cancel)
  25. {
  26. }
  27. protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source)
  28. {
  29. var @catch = source as Catch<TSource>;
  30. if (@catch != null)
  31. return @catch._sources;
  32. return null;
  33. }
  34. public override void OnNext(TSource value)
  35. {
  36. base._observer.OnNext(value);
  37. }
  38. private Exception _lastException;
  39. public override void OnError(Exception error)
  40. {
  41. _lastException = error;
  42. _recurse();
  43. }
  44. public override void OnCompleted()
  45. {
  46. base._observer.OnCompleted();
  47. base.Dispose();
  48. }
  49. protected override void Done()
  50. {
  51. if (_lastException != null)
  52. base._observer.OnError(_lastException);
  53. else
  54. base._observer.OnCompleted();
  55. base.Dispose();
  56. }
  57. protected override bool Fail(Exception error)
  58. {
  59. //
  60. // Note that the invocation of _recurse in OnError will
  61. // cause the next MoveNext operation to be enqueued, so
  62. // we will still return to the caller immediately.
  63. //
  64. OnError(error);
  65. return true;
  66. }
  67. }
  68. }
  69. class Catch<TSource, TException> : Producer<TSource> where TException : Exception
  70. {
  71. private readonly IObservable<TSource> _source;
  72. private readonly Func<TException, IObservable<TSource>> _handler;
  73. public Catch(IObservable<TSource> source, Func<TException, IObservable<TSource>> handler)
  74. {
  75. _source = source;
  76. _handler = handler;
  77. }
  78. protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
  79. {
  80. var sink = new _(this, observer, cancel);
  81. setSink(sink);
  82. return sink.Run();
  83. }
  84. class _ : Sink<TSource>, IObserver<TSource>
  85. {
  86. private readonly Catch<TSource, TException> _parent;
  87. public _(Catch<TSource, TException> parent, IObserver<TSource> observer, IDisposable cancel)
  88. : base(observer, cancel)
  89. {
  90. _parent = parent;
  91. }
  92. private SerialDisposable _subscription;
  93. public IDisposable Run()
  94. {
  95. _subscription = new SerialDisposable();
  96. var d1 = new SingleAssignmentDisposable();
  97. _subscription.Disposable = d1;
  98. d1.Disposable = _parent._source.SubscribeSafe(this);
  99. return _subscription;
  100. }
  101. public void OnNext(TSource value)
  102. {
  103. base._observer.OnNext(value);
  104. }
  105. public void OnError(Exception error)
  106. {
  107. var e = error as TException;
  108. if (e != null)
  109. {
  110. var result = default(IObservable<TSource>);
  111. try
  112. {
  113. result = _parent._handler(e);
  114. }
  115. catch (Exception ex)
  116. {
  117. base._observer.OnError(ex);
  118. base.Dispose();
  119. return;
  120. }
  121. var d = new SingleAssignmentDisposable();
  122. _subscription.Disposable = d;
  123. d.Disposable = result.SubscribeSafe(new Impl(this));
  124. }
  125. else
  126. {
  127. base._observer.OnError(error);
  128. base.Dispose();
  129. }
  130. }
  131. public void OnCompleted()
  132. {
  133. base._observer.OnCompleted();
  134. base.Dispose();
  135. }
  136. class Impl : IObserver<TSource>
  137. {
  138. private readonly _ _parent;
  139. public Impl(_ parent)
  140. {
  141. _parent = parent;
  142. }
  143. public void OnNext(TSource value)
  144. {
  145. _parent._observer.OnNext(value);
  146. }
  147. public void OnError(Exception error)
  148. {
  149. _parent._observer.OnError(error);
  150. _parent.Dispose();
  151. }
  152. public void OnCompleted()
  153. {
  154. _parent._observer.OnCompleted();
  155. _parent.Dispose();
  156. }
  157. }
  158. }
  159. }
  160. }
  161. #endif