TailRecursiveSink.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. #if !NO_PERF
  5. using System.Collections.Generic;
  6. using System.Reactive.Concurrency;
  7. using System.Reactive.Disposables;
  8. namespace System.Reactive
  9. {
  10. abstract class TailRecursiveSink<TSource> : Sink<TSource>, IObserver<TSource>
  11. {
  12. public TailRecursiveSink(IObserver<TSource> observer, IDisposable cancel)
  13. : base(observer, cancel)
  14. {
  15. }
  16. private bool _isDisposed;
  17. private SerialDisposable _subscription;
  18. private AsyncLock _gate;
  19. private Stack<IEnumerator<IObservable<TSource>>> _stack;
  20. private Stack<int?> _length;
  21. protected Action _recurse;
  22. public IDisposable Run(IEnumerable<IObservable<TSource>> sources)
  23. {
  24. _isDisposed = false;
  25. _subscription = new SerialDisposable();
  26. _gate = new AsyncLock();
  27. _stack = new Stack<IEnumerator<IObservable<TSource>>>();
  28. _length = new Stack<int?>();
  29. var e = default(IEnumerator<IObservable<TSource>>);
  30. if (!TryGetEnumerator(sources, out e))
  31. return Disposable.Empty;
  32. _stack.Push(e);
  33. _length.Push(Helpers.GetLength(sources));
  34. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self =>
  35. {
  36. _recurse = self;
  37. _gate.Wait(MoveNext);
  38. });
  39. return StableCompositeDisposable.Create(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose)));
  40. }
  41. protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);
  42. private void MoveNext()
  43. {
  44. var hasNext = false;
  45. var next = default(IObservable<TSource>);
  46. do
  47. {
  48. if (_stack.Count == 0)
  49. break;
  50. if (_isDisposed)
  51. return;
  52. var e = _stack.Peek();
  53. var l = _length.Peek();
  54. var current = default(IObservable<TSource>);
  55. try
  56. {
  57. hasNext = e.MoveNext();
  58. if (hasNext)
  59. current = e.Current;
  60. }
  61. catch (Exception ex)
  62. {
  63. e.Dispose();
  64. //
  65. // Failure to enumerate the sequence cannot be handled, even by
  66. // operators like Catch, because it'd lead to another attempt at
  67. // enumerating to find the next observable sequence. Therefore,
  68. // we feed those errors directly to the observer.
  69. //
  70. base._observer.OnError(ex);
  71. base.Dispose();
  72. return;
  73. }
  74. if (!hasNext)
  75. {
  76. e.Dispose();
  77. _stack.Pop();
  78. _length.Pop();
  79. }
  80. else
  81. {
  82. var r = l - 1;
  83. _length.Pop();
  84. _length.Push(r);
  85. try
  86. {
  87. next = Helpers.Unpack(current);
  88. }
  89. catch (Exception exception)
  90. {
  91. //
  92. // Errors from unpacking may produce side-effects that normally
  93. // would occur during a SubscribeSafe operation. Those would feed
  94. // back into the observer and be subject to the operator's error
  95. // handling behavior. For example, Catch would allow to handle
  96. // the error using a handler function.
  97. //
  98. if (!Fail(exception))
  99. {
  100. e.Dispose();
  101. }
  102. return;
  103. }
  104. //
  105. // Tail recursive case; drop the current frame.
  106. //
  107. if (r == 0)
  108. {
  109. e.Dispose();
  110. _stack.Pop();
  111. _length.Pop();
  112. }
  113. //
  114. // Flattening of nested sequences. Prevents stack overflow in observers.
  115. //
  116. var nextSeq = Extract(next);
  117. if (nextSeq != null)
  118. {
  119. var nextEnumerator = default(IEnumerator<IObservable<TSource>>);
  120. if (!TryGetEnumerator(nextSeq, out nextEnumerator))
  121. return;
  122. _stack.Push(nextEnumerator);
  123. _length.Push(Helpers.GetLength(nextSeq));
  124. hasNext = false;
  125. }
  126. }
  127. } while (!hasNext);
  128. if (!hasNext)
  129. {
  130. Done();
  131. return;
  132. }
  133. var d = new SingleAssignmentDisposable();
  134. _subscription.Disposable = d;
  135. d.Disposable = next.SubscribeSafe(this);
  136. }
  137. private new void Dispose()
  138. {
  139. while (_stack.Count > 0)
  140. {
  141. var e = _stack.Pop();
  142. _length.Pop();
  143. e.Dispose();
  144. }
  145. _isDisposed = true;
  146. }
  147. private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, out IEnumerator<IObservable<TSource>> result)
  148. {
  149. try
  150. {
  151. result = sources.GetEnumerator();
  152. return true;
  153. }
  154. catch (Exception exception)
  155. {
  156. //
  157. // Failure to enumerate the sequence cannot be handled, even by
  158. // operators like Catch, because it'd lead to another attempt at
  159. // enumerating to find the next observable sequence. Therefore,
  160. // we feed those errors directly to the observer.
  161. //
  162. base._observer.OnError(exception);
  163. base.Dispose();
  164. result = null;
  165. return false;
  166. }
  167. }
  168. public abstract void OnCompleted();
  169. public abstract void OnError(Exception error);
  170. public abstract void OnNext(TSource value);
  171. protected virtual void Done()
  172. {
  173. base._observer.OnCompleted();
  174. base.Dispose();
  175. }
  176. protected virtual bool Fail(Exception error)
  177. {
  178. base._observer.OnError(error);
  179. base.Dispose();
  180. return false;
  181. }
  182. }
  183. }
  184. #endif