TailRecursiveSink.cs 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System.Collections.Generic;
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive
  7. {
  8. abstract class TailRecursiveSink<TSource> : Sink<TSource>, IObserver<TSource>
  9. {
  10. public TailRecursiveSink(IObserver<TSource> observer, IDisposable cancel)
  11. : base(observer, cancel)
  12. {
  13. }
  14. private bool _isDisposed;
  15. private SerialDisposable _subscription;
  16. private AsyncLock _gate;
  17. private Stack<IEnumerator<IObservable<TSource>>> _stack;
  18. private Stack<int?> _length;
  19. protected Action _recurse;
  20. public IDisposable Run(IEnumerable<IObservable<TSource>> sources)
  21. {
  22. _isDisposed = false;
  23. _subscription = new SerialDisposable();
  24. _gate = new AsyncLock();
  25. _stack = new Stack<IEnumerator<IObservable<TSource>>>();
  26. _length = new Stack<int?>();
  27. var e = default(IEnumerator<IObservable<TSource>>);
  28. if (!TryGetEnumerator(sources, out e))
  29. return Disposable.Empty;
  30. _stack.Push(e);
  31. _length.Push(Helpers.GetLength(sources));
  32. var cancelable = SchedulerDefaults.TailRecursion.Schedule(self =>
  33. {
  34. _recurse = self;
  35. _gate.Wait(MoveNext);
  36. });
  37. return new CompositeDisposable(_subscription, cancelable, Disposable.Create(() => _gate.Wait(Dispose)));
  38. }
  39. protected abstract IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source);
  40. private void MoveNext()
  41. {
  42. var hasNext = false;
  43. var next = default(IObservable<TSource>);
  44. do
  45. {
  46. if (_stack.Count == 0)
  47. break;
  48. if (_isDisposed)
  49. return;
  50. var e = _stack.Peek();
  51. var l = _length.Peek();
  52. var current = default(IObservable<TSource>);
  53. try
  54. {
  55. hasNext = e.MoveNext();
  56. if (hasNext)
  57. current = e.Current;
  58. }
  59. catch (Exception ex)
  60. {
  61. e.Dispose();
  62. base._observer.OnError(ex);
  63. base.Dispose();
  64. return;
  65. }
  66. if (!hasNext)
  67. {
  68. e.Dispose();
  69. _stack.Pop();
  70. _length.Pop();
  71. }
  72. else
  73. {
  74. var r = l - 1;
  75. _length.Pop();
  76. _length.Push(r);
  77. try
  78. {
  79. next = Helpers.Unpack(current);
  80. }
  81. catch (Exception exception)
  82. {
  83. e.Dispose();
  84. base._observer.OnError(exception);
  85. base.Dispose();
  86. return;
  87. }
  88. //
  89. // Tail recursive case; drop the current frame.
  90. //
  91. if (r == 0)
  92. {
  93. e.Dispose();
  94. _stack.Pop();
  95. _length.Pop();
  96. }
  97. //
  98. // Flattening of nested sequences. Prevents stack overflow in observers.
  99. //
  100. var nextSeq = Extract(next);
  101. if (nextSeq != null)
  102. {
  103. var nextEnumerator = default(IEnumerator<IObservable<TSource>>);
  104. if (!TryGetEnumerator(nextSeq, out nextEnumerator))
  105. return;
  106. _stack.Push(nextEnumerator);
  107. _length.Push(Helpers.GetLength(nextSeq));
  108. hasNext = false;
  109. }
  110. }
  111. } while (!hasNext);
  112. if (!hasNext)
  113. {
  114. Done();
  115. return;
  116. }
  117. var d = new SingleAssignmentDisposable();
  118. _subscription.Disposable = d;
  119. d.Disposable = next.SubscribeSafe(this);
  120. }
  121. private new void Dispose()
  122. {
  123. while (_stack.Count > 0)
  124. {
  125. var e = _stack.Pop();
  126. _length.Pop();
  127. e.Dispose();
  128. }
  129. _isDisposed = true;
  130. }
  131. private bool TryGetEnumerator(IEnumerable<IObservable<TSource>> sources, out IEnumerator<IObservable<TSource>> result)
  132. {
  133. try
  134. {
  135. result = sources.GetEnumerator();
  136. return true;
  137. }
  138. catch (Exception exception)
  139. {
  140. base._observer.OnError(exception);
  141. base.Dispose();
  142. result = null;
  143. return false;
  144. }
  145. }
  146. public abstract void OnCompleted();
  147. public abstract void OnError(Exception error);
  148. public abstract void OnNext(TSource value);
  149. protected virtual void Done()
  150. {
  151. base._observer.OnCompleted();
  152. base.Dispose();
  153. }
  154. }
  155. }
  156. #endif