AppendPrepend.cs 16 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the Apache 2.0 License.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Reactive.Concurrency;
  5. using System.Reactive.Disposables;
  6. namespace System.Reactive.Linq.ObservableImpl
  7. {
  8. internal static class AppendPrepend
  9. {
  10. internal interface IAppendPrepend<TSource> : IObservable<TSource>
  11. {
  12. IAppendPrepend<TSource> Append(TSource value);
  13. IAppendPrepend<TSource> Prepend(TSource value);
  14. IScheduler Scheduler { get; }
  15. }
  16. internal sealed class AppendPrependSingle<TSource> : Producer<TSource, AppendPrependSingle<TSource>._>, IAppendPrepend<TSource>
  17. {
  18. private readonly IObservable<TSource> _source;
  19. private readonly TSource _value;
  20. private readonly bool _append;
  21. public IScheduler Scheduler { get; }
  22. public AppendPrependSingle(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
  23. {
  24. _source = source;
  25. _value = value;
  26. _append = append;
  27. Scheduler = scheduler;
  28. }
  29. public IAppendPrepend<TSource> Append(TSource value)
  30. {
  31. var prev = new Node<TSource>(_value);
  32. if (_append)
  33. {
  34. return new AppendPrependMultiple<TSource>(_source,
  35. null, new Node<TSource>(prev, value), Scheduler);
  36. }
  37. return new AppendPrependMultiple<TSource>(_source,
  38. prev, new Node<TSource>(value), Scheduler);
  39. }
  40. public IAppendPrepend<TSource> Prepend(TSource value)
  41. {
  42. var prev = new Node<TSource>(_value);
  43. if (_append)
  44. {
  45. return new AppendPrependMultiple<TSource>(_source,
  46. new Node<TSource>(value), prev, Scheduler);
  47. }
  48. return new AppendPrependMultiple<TSource>(_source,
  49. new Node<TSource>(prev, value), null, Scheduler);
  50. }
  51. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  52. protected override void Run(_ sink) => sink.Run();
  53. internal sealed class _ : IdentitySink<TSource>
  54. {
  55. private readonly IObservable<TSource> _source;
  56. private readonly TSource _value;
  57. private readonly IScheduler _scheduler;
  58. private readonly bool _append;
  59. private IDisposable _schedulerDisposable;
  60. public _(AppendPrependSingle<TSource> parent, IObserver<TSource> observer)
  61. : base(observer)
  62. {
  63. _source = parent._source;
  64. _value = parent._value;
  65. _scheduler = parent.Scheduler;
  66. _append = parent._append;
  67. }
  68. public void Run()
  69. {
  70. var disp = _append
  71. ? _source.SubscribeSafe(this)
  72. : _scheduler.ScheduleAction(this, PrependValue);
  73. SetUpstream(disp);
  74. }
  75. private static IDisposable PrependValue(_ sink)
  76. {
  77. sink.ForwardOnNext(sink._value);
  78. return sink._source.SubscribeSafe(sink);
  79. }
  80. public override void OnCompleted()
  81. {
  82. if (_append)
  83. {
  84. var disposable = _scheduler.ScheduleAction(this, AppendValue);
  85. Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
  86. }
  87. else
  88. {
  89. ForwardOnCompleted();
  90. }
  91. }
  92. private static void AppendValue(_ sink)
  93. {
  94. sink.ForwardOnNext(sink._value);
  95. sink.ForwardOnCompleted();
  96. }
  97. protected override void Dispose(bool disposing)
  98. {
  99. if (disposing)
  100. {
  101. Disposable.TryDispose(ref _schedulerDisposable);
  102. }
  103. base.Dispose(disposing);
  104. }
  105. }
  106. }
  107. private sealed class AppendPrependMultiple<TSource> : Producer<TSource, AppendPrependMultiple<TSource>._>, IAppendPrepend<TSource>
  108. {
  109. private readonly IObservable<TSource> _source;
  110. private readonly Node<TSource> _appends;
  111. private readonly Node<TSource> _prepends;
  112. public IScheduler Scheduler { get; }
  113. public AppendPrependMultiple(IObservable<TSource> source, Node<TSource> prepends, Node<TSource> appends, IScheduler scheduler)
  114. {
  115. _source = source;
  116. _appends = appends;
  117. _prepends = prepends;
  118. Scheduler = scheduler;
  119. }
  120. public IAppendPrepend<TSource> Append(TSource value)
  121. {
  122. return new AppendPrependMultiple<TSource>(_source,
  123. _prepends, new Node<TSource>(_appends, value), Scheduler);
  124. }
  125. public IAppendPrepend<TSource> Prepend(TSource value)
  126. {
  127. return new AppendPrependMultiple<TSource>(_source,
  128. new Node<TSource>(_prepends, value), _appends, Scheduler);
  129. }
  130. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  131. protected override void Run(_ sink) => sink.Run();
  132. // The sink is based on the sink of the ToObervalbe class and does basically
  133. // the same twice, once for the append list and once for the prepend list.
  134. // Inbetween it forwards the values of the source class.
  135. //
  136. internal sealed class _ : IdentitySink<TSource>
  137. {
  138. private readonly IObservable<TSource> _source;
  139. private readonly TSource[] _prepends;
  140. private readonly TSource[] _appends;
  141. private readonly IScheduler _scheduler;
  142. private IDisposable _schedulerDisposable;
  143. public _(AppendPrependMultiple<TSource> parent, IObserver<TSource> observer)
  144. : base(observer)
  145. {
  146. _source = parent._source;
  147. _scheduler = parent.Scheduler;
  148. if (parent._prepends != null)
  149. {
  150. _prepends = parent._prepends.ToArray();
  151. }
  152. if (parent._appends != null)
  153. {
  154. _appends = parent._appends.ToReverseArray();
  155. }
  156. }
  157. public void Run()
  158. {
  159. if (_prepends != null)
  160. {
  161. var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s)));
  162. Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
  163. }
  164. else
  165. {
  166. SetUpstream(_source.SubscribeSafe(this));
  167. }
  168. }
  169. public override void OnCompleted()
  170. {
  171. if (_appends != null)
  172. {
  173. var disposable = Schedule(_appends, s => s.ForwardOnCompleted());
  174. Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
  175. }
  176. else
  177. {
  178. ForwardOnCompleted();
  179. }
  180. }
  181. protected override void Dispose(bool disposing)
  182. {
  183. if (disposing)
  184. {
  185. Disposable.TryDispose(ref _schedulerDisposable);
  186. }
  187. base.Dispose(disposing);
  188. }
  189. private IDisposable Schedule(TSource[] array, Action<_> continueWith)
  190. {
  191. var longRunning = _scheduler.AsLongRunning();
  192. if (longRunning != null)
  193. {
  194. //
  195. // Long-running schedulers have the contract they should *never* prevent
  196. // the work from starting, such that the scheduled work has the chance
  197. // to observe the cancellation and perform proper clean-up. In this case,
  198. // we're sure Loop will be entered, allowing us to dispose the enumerator.
  199. //
  200. return longRunning.ScheduleLongRunning(new State(null, this, array, continueWith), Loop);
  201. }
  202. //
  203. // We never allow the scheduled work to be cancelled. Instead, the flag
  204. // is used to have LoopRec bail out and perform proper clean-up of the
  205. // enumerator.
  206. //
  207. var flag = new BooleanDisposable();
  208. _scheduler.Schedule(new State(flag, this, array, continueWith), LoopRec);
  209. return flag;
  210. }
  211. private struct State
  212. {
  213. public readonly _ _sink;
  214. public readonly ICancelable _flag;
  215. public readonly TSource[] _array;
  216. public readonly Action<_> _continue;
  217. public int _current;
  218. public State(ICancelable flag, _ sink, TSource[] array, Action<_> c)
  219. {
  220. _sink = sink;
  221. _flag = flag;
  222. _continue = c;
  223. _array = array;
  224. _current = 0;
  225. }
  226. }
  227. private void LoopRec(State state, Action<State> recurse)
  228. {
  229. if (state._flag.IsDisposed)
  230. {
  231. return;
  232. }
  233. var current = state._array[state._current];
  234. ForwardOnNext(current);
  235. state._current++;
  236. if (state._current == state._array.Length)
  237. {
  238. state._continue(state._sink);
  239. return;
  240. }
  241. recurse(state);
  242. }
  243. private void Loop(State state, ICancelable cancel)
  244. {
  245. var array = state._array;
  246. var i = 0;
  247. while (!cancel.IsDisposed)
  248. {
  249. ForwardOnNext(array[i]);
  250. i++;
  251. if (i == array.Length)
  252. {
  253. state._continue(state._sink);
  254. break;
  255. }
  256. }
  257. }
  258. }
  259. }
  260. private sealed class Node<T>
  261. {
  262. private readonly Node<T> _parent;
  263. private readonly T _value;
  264. private readonly int _count;
  265. public Node(T value)
  266. : this(null, value)
  267. {
  268. }
  269. public Node(Node<T> parent, T value)
  270. {
  271. _parent = parent;
  272. _value = value;
  273. if (parent == null)
  274. {
  275. _count = 1;
  276. }
  277. else
  278. {
  279. if (parent._count == int.MaxValue)
  280. {
  281. throw new NotSupportedException($"Consecutive appends or prepends with a count of more than int.MaxValue ({int.MaxValue}) are not supported.");
  282. }
  283. _count = parent._count + 1;
  284. }
  285. }
  286. public T[] ToArray()
  287. {
  288. var array = new T[_count];
  289. var current = this;
  290. for (var i = 0; i < _count; i++)
  291. {
  292. array[i] = current._value;
  293. current = current._parent;
  294. }
  295. return array;
  296. }
  297. public T[] ToReverseArray()
  298. {
  299. var array = new T[_count];
  300. var current = this;
  301. for (var i = _count - 1; i >= 0; i--)
  302. {
  303. array[i] = current._value;
  304. current = current._parent;
  305. }
  306. return array;
  307. }
  308. }
  309. internal sealed class AppendPrependSingleImmediate<TSource> : Producer<TSource, AppendPrependSingleImmediate<TSource>._>, IAppendPrepend<TSource>
  310. {
  311. private readonly IObservable<TSource> _source;
  312. private readonly TSource _value;
  313. private readonly bool _append;
  314. public IScheduler Scheduler { get { return ImmediateScheduler.Instance; } }
  315. public AppendPrependSingleImmediate(IObservable<TSource> source, TSource value, bool append)
  316. {
  317. _source = source;
  318. _value = value;
  319. _append = append;
  320. }
  321. public IAppendPrepend<TSource> Append(TSource value)
  322. {
  323. var prev = new Node<TSource>(_value);
  324. if (_append)
  325. {
  326. return new AppendPrependMultiple<TSource>(_source,
  327. null, new Node<TSource>(prev, value), Scheduler);
  328. }
  329. return new AppendPrependMultiple<TSource>(_source,
  330. prev, new Node<TSource>(value), Scheduler);
  331. }
  332. public IAppendPrepend<TSource> Prepend(TSource value)
  333. {
  334. var prev = new Node<TSource>(_value);
  335. if (_append)
  336. {
  337. return new AppendPrependMultiple<TSource>(_source,
  338. new Node<TSource>(value), prev, Scheduler);
  339. }
  340. return new AppendPrependMultiple<TSource>(_source,
  341. new Node<TSource>(prev, value), null, Scheduler);
  342. }
  343. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  344. protected override void Run(_ sink) => sink.Run();
  345. internal sealed class _ : IdentitySink<TSource>
  346. {
  347. private readonly IObservable<TSource> _source;
  348. private readonly TSource _value;
  349. private readonly bool _append;
  350. public _(AppendPrependSingleImmediate<TSource> parent, IObserver<TSource> observer)
  351. : base(observer)
  352. {
  353. _source = parent._source;
  354. _value = parent._value;
  355. _append = parent._append;
  356. }
  357. public void Run()
  358. {
  359. if (!_append)
  360. {
  361. ForwardOnNext(_value);
  362. }
  363. Run(_source);
  364. }
  365. public override void OnCompleted()
  366. {
  367. if (_append)
  368. {
  369. ForwardOnNext(_value);
  370. }
  371. ForwardOnCompleted();
  372. }
  373. }
  374. }
  375. }
  376. }