Generate.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal static class Generate<TState, TResult>
  9. {
  10. internal sealed class NoTime : Producer<TResult, NoTime._>
  11. {
  12. private readonly TState _initialState;
  13. private readonly Func<TState, bool> _condition;
  14. private readonly Func<TState, TState> _iterate;
  15. private readonly Func<TState, TResult> _resultSelector;
  16. private readonly IScheduler _scheduler;
  17. public NoTime(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, IScheduler scheduler)
  18. {
  19. _initialState = initialState;
  20. _condition = condition;
  21. _iterate = iterate;
  22. _resultSelector = resultSelector;
  23. _scheduler = scheduler;
  24. }
  25. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  26. protected override IDisposable Run(_ sink) => sink.Run();
  27. internal sealed class _ : Sink<TResult>
  28. {
  29. // CONSIDER: This sink has a parent reference that can be considered for removal.
  30. private readonly NoTime _parent;
  31. public _(NoTime parent, IObserver<TResult> observer, IDisposable cancel)
  32. : base(observer, cancel)
  33. {
  34. _parent = parent;
  35. }
  36. private TState _state;
  37. private bool _first;
  38. public IDisposable Run()
  39. {
  40. _state = _parent._initialState;
  41. _first = true;
  42. var longRunning = _parent._scheduler.AsLongRunning();
  43. if (longRunning != null)
  44. {
  45. return longRunning.ScheduleLongRunning(Loop);
  46. }
  47. else
  48. {
  49. return _parent._scheduler.Schedule(LoopRec);
  50. }
  51. }
  52. private void Loop(ICancelable cancel)
  53. {
  54. while (!cancel.IsDisposed)
  55. {
  56. var hasResult = false;
  57. var result = default(TResult);
  58. try
  59. {
  60. if (_first)
  61. {
  62. _first = false;
  63. }
  64. else
  65. {
  66. _state = _parent._iterate(_state);
  67. }
  68. hasResult = _parent._condition(_state);
  69. if (hasResult)
  70. {
  71. result = _parent._resultSelector(_state);
  72. }
  73. }
  74. catch (Exception exception)
  75. {
  76. base._observer.OnError(exception);
  77. base.Dispose();
  78. return;
  79. }
  80. if (hasResult)
  81. {
  82. base._observer.OnNext(result);
  83. }
  84. else
  85. {
  86. break;
  87. }
  88. }
  89. if (!cancel.IsDisposed)
  90. {
  91. base._observer.OnCompleted();
  92. }
  93. base.Dispose();
  94. }
  95. private void LoopRec(Action recurse)
  96. {
  97. var hasResult = false;
  98. var result = default(TResult);
  99. try
  100. {
  101. if (_first)
  102. {
  103. _first = false;
  104. }
  105. else
  106. {
  107. _state = _parent._iterate(_state);
  108. }
  109. hasResult = _parent._condition(_state);
  110. if (hasResult)
  111. {
  112. result = _parent._resultSelector(_state);
  113. }
  114. }
  115. catch (Exception exception)
  116. {
  117. base._observer.OnError(exception);
  118. base.Dispose();
  119. return;
  120. }
  121. if (hasResult)
  122. {
  123. base._observer.OnNext(result);
  124. recurse();
  125. }
  126. else
  127. {
  128. base._observer.OnCompleted();
  129. base.Dispose();
  130. }
  131. }
  132. }
  133. }
  134. internal sealed class Absolute : Producer<TResult, Absolute._>
  135. {
  136. private readonly TState _initialState;
  137. private readonly Func<TState, bool> _condition;
  138. private readonly Func<TState, TState> _iterate;
  139. private readonly Func<TState, TResult> _resultSelector;
  140. private readonly Func<TState, DateTimeOffset> _timeSelector;
  141. private readonly IScheduler _scheduler;
  142. public Absolute(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, DateTimeOffset> timeSelector, IScheduler scheduler)
  143. {
  144. _initialState = initialState;
  145. _condition = condition;
  146. _iterate = iterate;
  147. _resultSelector = resultSelector;
  148. _timeSelector = timeSelector;
  149. _scheduler = scheduler;
  150. }
  151. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  152. protected override IDisposable Run(_ sink) => sink.Run();
  153. internal sealed class _ : Sink<TResult>
  154. {
  155. // CONSIDER: This sink has a parent reference that can be considered for removal.
  156. private readonly Absolute _parent;
  157. public _(Absolute parent, IObserver<TResult> observer, IDisposable cancel)
  158. : base(observer, cancel)
  159. {
  160. _parent = parent;
  161. }
  162. private bool _first;
  163. private bool _hasResult;
  164. private TResult _result;
  165. public IDisposable Run()
  166. {
  167. _first = true;
  168. _hasResult = false;
  169. _result = default(TResult);
  170. return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
  171. }
  172. private IDisposable InvokeRec(IScheduler self, TState state)
  173. {
  174. var time = default(DateTimeOffset);
  175. if (_hasResult)
  176. {
  177. base._observer.OnNext(_result);
  178. }
  179. try
  180. {
  181. if (_first)
  182. {
  183. _first = false;
  184. }
  185. else
  186. {
  187. state = _parent._iterate(state);
  188. }
  189. _hasResult = _parent._condition(state);
  190. if (_hasResult)
  191. {
  192. _result = _parent._resultSelector(state);
  193. time = _parent._timeSelector(state);
  194. }
  195. }
  196. catch (Exception exception)
  197. {
  198. base._observer.OnError(exception);
  199. base.Dispose();
  200. return Disposable.Empty;
  201. }
  202. if (!_hasResult)
  203. {
  204. base._observer.OnCompleted();
  205. base.Dispose();
  206. return Disposable.Empty;
  207. }
  208. return self.Schedule(state, time, InvokeRec);
  209. }
  210. }
  211. }
  212. internal sealed class Relative : Producer<TResult, Relative._>
  213. {
  214. private readonly TState _initialState;
  215. private readonly Func<TState, bool> _condition;
  216. private readonly Func<TState, TState> _iterate;
  217. private readonly Func<TState, TResult> _resultSelector;
  218. private readonly Func<TState, TimeSpan> _timeSelector;
  219. private readonly IScheduler _scheduler;
  220. public Relative(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, Func<TState, TimeSpan> timeSelector, IScheduler scheduler)
  221. {
  222. _initialState = initialState;
  223. _condition = condition;
  224. _iterate = iterate;
  225. _resultSelector = resultSelector;
  226. _timeSelector = timeSelector;
  227. _scheduler = scheduler;
  228. }
  229. protected override _ CreateSink(IObserver<TResult> observer, IDisposable cancel) => new _(this, observer, cancel);
  230. protected override IDisposable Run(_ sink) => sink.Run();
  231. internal sealed class _ : Sink<TResult>
  232. {
  233. // CONSIDER: This sink has a parent reference that can be considered for removal.
  234. private readonly Relative _parent;
  235. public _(Relative parent, IObserver<TResult> observer, IDisposable cancel)
  236. : base(observer, cancel)
  237. {
  238. _parent = parent;
  239. }
  240. private bool _first;
  241. private bool _hasResult;
  242. private TResult _result;
  243. public IDisposable Run()
  244. {
  245. _first = true;
  246. _hasResult = false;
  247. _result = default(TResult);
  248. return _parent._scheduler.Schedule(_parent._initialState, InvokeRec);
  249. }
  250. private IDisposable InvokeRec(IScheduler self, TState state)
  251. {
  252. var time = default(TimeSpan);
  253. if (_hasResult)
  254. {
  255. base._observer.OnNext(_result);
  256. }
  257. try
  258. {
  259. if (_first)
  260. {
  261. _first = false;
  262. }
  263. else
  264. {
  265. state = _parent._iterate(state);
  266. }
  267. _hasResult = _parent._condition(state);
  268. if (_hasResult)
  269. {
  270. _result = _parent._resultSelector(state);
  271. time = _parent._timeSelector(state);
  272. }
  273. }
  274. catch (Exception exception)
  275. {
  276. base._observer.OnError(exception);
  277. base.Dispose();
  278. return Disposable.Empty;
  279. }
  280. if (!_hasResult)
  281. {
  282. base._observer.OnCompleted();
  283. base.Dispose();
  284. return Disposable.Empty;
  285. }
  286. return self.Schedule(state, time, InvokeRec);
  287. }
  288. }
  289. }
  290. }
  291. }