1
0

Generate.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292
  1. // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
  2. #if !NO_PERF
  3. using System;
  4. using System.Diagnostics;
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. using System.Threading;
  8. namespace System.Reactive.Linq.ObservableImpl
  9. {
  10. class Generate<TState, TResult> : Producer<TResult>
  11. {
  12. private readonly TState _initialState;
  13. private readonly Func<TState, bool> _condition;
  14. private readonly Func<TState, TState> _iterate;
  15. private readonly Func<TState, TResult> _resultSelector;
  16. private readonly Func<TState, DateTimeOffset> _timeSelectorA;
  17. private readonly Func<TState, TimeSpan> _timeSelectorR;
  18. private readonly IScheduler _scheduler;
  19. public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
  20. {
  21. _initialState = initialState;
  22. _condition = condition;
  23. _iterate = iterate;
  24. _resultSelector = resultSelector;
  25. _scheduler = scheduler;
  26. }
  27. public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  28. {
  29. _initialState = initialState;
  30. _condition = condition;
  31. _iterate = iterate;
  32. _resultSelector = resultSelector;
  33. _timeSelectorA = timeSelector;
  34. _scheduler = scheduler;
  35. }
  36. public Generate(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  37. {
  38. _initialState = initialState;
  39. _condition = condition;
  40. _iterate = iterate;
  41. _resultSelector = resultSelector;
  42. _timeSelectorR = timeSelector;
  43. _scheduler = scheduler;
  44. }
  45. protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
  46. {
  47. if (_timeSelectorA != null)
  48. {
  49. var sink = new SelectorA(this, observer, cancel);
  50. setSink(sink);
  51. return sink.Run();
  52. }
  53. else if (_timeSelectorR != null)
  54. {
  55. var sink = new Delta(this, observer, cancel);
  56. setSink(sink);
  57. return sink.Run();
  58. }
  59. else
  60. {
  61. var sink = new _(this, observer, cancel);
  62. setSink(sink);
  63. return sink.Run();
  64. }
  65. }
  66. class SelectorA : Sink<TResult>
  67. {
  68. private readonly Generate<TState, TResult> _parent;
  69. public SelectorA(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  70. : base(observer, cancel)
  71. {
  72. _parent = parent;
  73. }
  74. private bool _first;
  75. private bool _hasResult;
  76. private TResult _result;
  77. public IDisposable Run()
  78. {
  79. _first = true;
  80. _hasResult = false;
  81. _result = default(TResult);
  82. return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
  83. }
  84. private IDisposable InvokeRec(IScheduler self, TState state)
  85. {
  86. var time = default(DateTimeOffset);
  87. if (_hasResult)
  88. base._observer.OnNext(_result);
  89. try
  90. {
  91. if (_first)
  92. _first = false;
  93. else
  94. state = _parent._iterate(state);
  95. _hasResult = _parent._condition(state);
  96. if (_hasResult)
  97. {
  98. _result = _parent._resultSelector(state);
  99. time = _parent._timeSelectorA(state);
  100. }
  101. }
  102. catch (Exception exception)
  103. {
  104. base._observer.OnError(exception);
  105. base.Dispose();
  106. return Disposable.Empty;
  107. }
  108. if (!_hasResult)
  109. {
  110. base._observer.OnCompleted();
  111. base.Dispose();
  112. return Disposable.Empty;
  113. }
  114. return self.Schedule(state, time, InvokeRec);
  115. }
  116. }
  117. class Delta : Sink<TResult>
  118. {
  119. private readonly Generate<TState, TResult> _parent;
  120. public Delta(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  121. : base(observer, cancel)
  122. {
  123. _parent = parent;
  124. }
  125. private bool _first;
  126. private bool _hasResult;
  127. private TResult _result;
  128. public IDisposable Run()
  129. {
  130. _first = true;
  131. _hasResult = false;
  132. _result = default(TResult);
  133. return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
  134. }
  135. private IDisposable InvokeRec(IScheduler self, TState state)
  136. {
  137. var time = default(TimeSpan);
  138. if (_hasResult)
  139. base._observer.OnNext(_result);
  140. try
  141. {
  142. if (_first)
  143. _first = false;
  144. else
  145. state = _parent._iterate(state);
  146. _hasResult = _parent._condition(state);
  147. if (_hasResult)
  148. {
  149. _result = _parent._resultSelector(state);
  150. time = _parent._timeSelectorR(state);
  151. }
  152. }
  153. catch (Exception exception)
  154. {
  155. base._observer.OnError(exception);
  156. base.Dispose();
  157. return Disposable.Empty;
  158. }
  159. if (!_hasResult)
  160. {
  161. base._observer.OnCompleted();
  162. base.Dispose();
  163. return Disposable.Empty;
  164. }
  165. return self.Schedule(state, time, InvokeRec);
  166. }
  167. }
  168. class _ : Sink<TResult>
  169. {
  170. private readonly Generate<TState, TResult> _parent;
  171. public _(Generate<TState, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
  172. : base(observer, cancel)
  173. {
  174. _parent = parent;
  175. }
  176. private TState _state;
  177. private bool _first;
  178. public IDisposable Run()
  179. {
  180. _state = _parent._initialState;
  181. _first = true;
  182. var longRunning = _parent._scheduler.AsLongRunning();
  183. if (longRunning != null)
  184. {
  185. return longRunning.ScheduleLongRunning(Loop);
  186. }
  187. else
  188. {
  189. return _parent._scheduler.Schedule(LoopRec);
  190. }
  191. }
  192. private void Loop(ICancelable cancel)
  193. {
  194. while (!cancel.IsDisposed)
  195. {
  196. var hasResult = false;
  197. var result = default(TResult);
  198. try
  199. {
  200. if (_first)
  201. _first = false;
  202. else
  203. _state = _parent._iterate(_state);
  204. hasResult = _parent._condition(_state);
  205. if (hasResult)
  206. result = _parent._resultSelector(_state);
  207. }
  208. catch (Exception exception)
  209. {
  210. base._observer.OnError(exception);
  211. base.Dispose();
  212. return;
  213. }
  214. if (hasResult)
  215. base._observer.OnNext(result);
  216. else
  217. break;
  218. }
  219. if (!cancel.IsDisposed)
  220. base._observer.OnCompleted();
  221. base.Dispose();
  222. }
  223. private void LoopRec(Action recurse)
  224. {
  225. var hasResult = false;
  226. var result = default(TResult);
  227. try
  228. {
  229. if (_first)
  230. _first = false;
  231. else
  232. _state = _parent._iterate(_state);
  233. hasResult = _parent._condition(_state);
  234. if (hasResult)
  235. result = _parent._resultSelector(_state);
  236. }
  237. catch (Exception exception)
  238. {
  239. base._observer.OnError(exception);
  240. base.Dispose();
  241. return;
  242. }
  243. if (hasResult)
  244. {
  245. base._observer.OnNext(result);
  246. recurse();
  247. }
  248. else
  249. {
  250. base._observer.OnCompleted();
  251. base.Dispose();
  252. }
  253. }
  254. }
  255. }
  256. }
  257. #endif