TailRecursiveSink.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System.Collections.Generic;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive
  7. {
  8. abstract class TailRecursiveSink<TSource> : Sink<TSource>, IObserver<TSource>
  9. {
  10. public TailRecursiveSink(IObserver<TSource> observer, IDisposable cancel)
  11. : base(observer, cancel)
  12. {
  13. }
  14. private bool _isDisposed;
  15. private SerialDisposable _subscription;
  16. private AsyncLock _gate;
  17. private Stack<IEnumerator<IObservable<TSource>>> _stack;
  18. private Stack<int?> _length;
  19. protected Action _recurse;
  20. public IDisposable Run(IEnumerable<IObservable<TSource>> sources)
  21. {
  22. _isDisposed = false;
  23. _subscription = new SerialDisposable();
  24. _gate = new AsyncLock();
  25. _stack = new Stack<IEnumerator<IObservable<TSource>>>();
  26. _length = new Stack<int?>();
  27. var e = default(IEnumerator<IObservable<TSource>>);
  28. if (!TryGetEnumerator(sources, out e))
  29. return Disposable.Empty;
  30. _stack.Push(e);
  31. _length.Push(Helpers.GetLength(sources));
  32. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self =>
  33. {
  34. _recurse = self;
  35. _gate.Wait(MoveNext);
  36. });
  37. return StableCompositeDisposable.Create(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose)));
  38. }
  39. protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);
  40. private void MoveNext()
  41. {
  42. var hasNext = false;
  43. var next = default(IObservable<TSource>);
  44. do
  45. {
  46. if (_stack.Count == 0)
  47. break;
  48. if (_isDisposed)
  49. return;
  50. var e = _stack.Peek();
  51. var l = _length.Peek();
  52. var current = default(IObservable<TSource>);
  53. try
  54. {
  55. hasNext = e.MoveNext();
  56. if (hasNext)
  57. current = e.Current;
  58. }
  59. catch (Exception ex)
  60. {
  61. e.Dispose();
  62. //
  63. // Failure to enumerate the sequence cannot be handled, even by
  64. // operators like Catch, because it'd lead to another attempt at
  65. // enumerating to find the next observable sequence. Therefore,
  66. // we feed those errors directly to the observer.
  67. //
  68. base._observer.OnError(ex);
  69. base.Dispose();
  70. return;
  71. }
  72. if (!hasNext)
  73. {
  74. e.Dispose();
  75. _stack.Pop();
  76. _length.Pop();
  77. }
  78. else
  79. {
  80. var r = l - 1;
  81. _length.Pop();
  82. _length.Push(r);
  83. try
  84. {
  85. next = Helpers.Unpack(current);
  86. }
  87. catch (Exception exception)
  88. {
  89. //
  90. // Errors from unpacking may produce side-effects that normally
  91. // would occur during a SubscribeSafe operation. Those would feed
  92. // back into the observer and be subject to the operator's error
  93. // handling behavior. For example, Catch would allow to handle
  94. // the error using a handler function.
  95. //
  96. if (!Fail(exception))
  97. {
  98. e.Dispose();
  99. }
  100. return;
  101. }
  102. //
  103. // Tail recursive case; drop the current frame.
  104. //
  105. if (r == 0)
  106. {
  107. e.Dispose();
  108. _stack.Pop();
  109. _length.Pop();
  110. }
  111. //
  112. // Flattening of nested sequences. Prevents stack overflow in observers.
  113. //
  114. var nextSeq = Extract(next);
  115. if (nextSeq != null)
  116. {
  117. var nextEnumerator = default(IEnumerator<IObservable<TSource>>);
  118. if (!TryGetEnumerator(nextSeq, out nextEnumerator))
  119. return;
  120. _stack.Push(nextEnumerator);
  121. _length.Push(Helpers.GetLength(nextSeq));
  122. hasNext = false;
  123. }
  124. }
  125. } while (!hasNext);
  126. if (!hasNext)
  127. {
  128. Done();
  129. return;
  130. }
  131. var d = new SingleAssignmentDisposable();
  132. _subscription.Disposable = d;
  133. d.Disposable = next.SubscribeSafe(this);
  134. }
  135. private new void Dispose()
  136. {
  137. while (_stack.Count > 0)
  138. {
  139. var e = _stack.Pop();
  140. _length.Pop();
  141. e.Dispose();
  142. }
  143. _isDisposed = true;
  144. }
  145. private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, out IEnumerator<IObservable<TSource>> result)
  146. {
  147. try
  148. {
  149. result = sources.GetEnumerator();
  150. return true;
  151. }
  152. catch (Exception exception)
  153. {
  154. //
  155. // Failure to enumerate the sequence cannot be handled, even by
  156. // operators like Catch, because it'd lead to another attempt at
  157. // enumerating to find the next observable sequence. Therefore,
  158. // we feed those errors directly to the observer.
  159. //
  160. base._observer.OnError(exception);
  161. base.Dispose();
  162. result = null;
  163. return false;
  164. }
  165. }
  166. public abstract void OnCompleted();
  167. public abstract void OnError(Exception error);
  168. public abstract void OnNext(TSource value);
  169. protected virtual void Done()
  170. {
  171. base._observer.OnCompleted();
  172. base.Dispose();
  173. }
  174. protected virtual bool Fail(Exception error)
  175. {
  176. base._observer.OnError(error);
  177. base.Dispose();
  178. return false;
  179. }
  180. }
  181. }
  182. #endif