AppendPrepend.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT License.
  3. // See the LICENSE file in the project root for more information.
  4. #nullable disable
  5. using System.Reactive.Concurrency;
  6. using System.Reactive.Disposables;
  7. namespace System.Reactive.Linq.ObservableImpl
  8. {
  9. internal static class AppendPrepend<TSource>
  10. {
  11. internal interface IAppendPrepend : IObservable<TSource>
  12. {
  13. IAppendPrepend Append(TSource value);
  14. IAppendPrepend Prepend(TSource value);
  15. IScheduler Scheduler { get; }
  16. }
  17. internal abstract class SingleBase<TSink> : Producer<TSource, TSink>, IAppendPrepend
  18. where TSink : IDisposable
  19. {
  20. protected readonly IObservable<TSource> _source;
  21. protected readonly TSource _value;
  22. protected readonly bool _append;
  23. public abstract IScheduler Scheduler { get; }
  24. public SingleBase(IObservable<TSource> source, TSource value, bool append)
  25. {
  26. _source = source;
  27. _value = value;
  28. _append = append;
  29. }
  30. public IAppendPrepend Append(TSource value)
  31. {
  32. var prev = new Node<TSource>(_value);
  33. Node<TSource> appendNode;
  34. Node<TSource> prependNode = null;
  35. if (_append)
  36. {
  37. appendNode = new Node<TSource>(prev, value);
  38. }
  39. else
  40. {
  41. prependNode = prev;
  42. appendNode = new Node<TSource>(value);
  43. }
  44. return CreateAppendPrepend(prependNode, appendNode);
  45. }
  46. public IAppendPrepend Prepend(TSource value)
  47. {
  48. var prev = new Node<TSource>(_value);
  49. Node<TSource> appendNode = null;
  50. Node<TSource> prependNode;
  51. if (_append)
  52. {
  53. prependNode = new Node<TSource>(value);
  54. appendNode = prev;
  55. }
  56. else
  57. {
  58. prependNode = new Node<TSource>(prev, value);
  59. }
  60. return CreateAppendPrepend(prependNode, appendNode);
  61. }
  62. private IAppendPrepend CreateAppendPrepend(Node<TSource> prepend, Node<TSource> append)
  63. {
  64. if (Scheduler is ISchedulerLongRunning longRunning)
  65. {
  66. return new LongRunning(_source, prepend, append, Scheduler, longRunning);
  67. }
  68. return new Recursive(_source, prepend, append, Scheduler);
  69. }
  70. }
  71. internal sealed class SingleValue : SingleBase<SingleValue._>
  72. {
  73. public override IScheduler Scheduler { get; }
  74. public SingleValue(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
  75. : base (source, value, append)
  76. {
  77. Scheduler = scheduler;
  78. }
  79. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  80. protected override void Run(_ sink) => sink.Run();
  81. internal sealed class _ : IdentitySink<TSource>
  82. {
  83. private readonly IObservable<TSource> _source;
  84. private readonly TSource _value;
  85. private readonly IScheduler _scheduler;
  86. private readonly bool _append;
  87. private IDisposable _schedulerDisposable;
  88. public _(SingleValue parent, IObserver<TSource> observer)
  89. : base(observer)
  90. {
  91. _source = parent._source;
  92. _value = parent._value;
  93. _scheduler = parent.Scheduler;
  94. _append = parent._append;
  95. }
  96. public void Run()
  97. {
  98. var disp = _append
  99. ? _source.SubscribeSafe(this)
  100. : _scheduler.ScheduleAction(this, PrependValue);
  101. SetUpstream(disp);
  102. }
  103. private static IDisposable PrependValue(_ sink)
  104. {
  105. sink.ForwardOnNext(sink._value);
  106. return sink._source.SubscribeSafe(sink);
  107. }
  108. public override void OnCompleted()
  109. {
  110. if (_append)
  111. {
  112. var disposable = _scheduler.ScheduleAction(this, AppendValue);
  113. Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
  114. }
  115. else
  116. {
  117. ForwardOnCompleted();
  118. }
  119. }
  120. private static void AppendValue(_ sink)
  121. {
  122. sink.ForwardOnNext(sink._value);
  123. sink.ForwardOnCompleted();
  124. }
  125. protected override void Dispose(bool disposing)
  126. {
  127. if (disposing)
  128. {
  129. Disposable.Dispose(ref _schedulerDisposable);
  130. }
  131. base.Dispose(disposing);
  132. }
  133. }
  134. }
  135. private sealed class Recursive : Producer<TSource, Recursive._>, IAppendPrepend
  136. {
  137. private readonly IObservable<TSource> _source;
  138. private readonly Node<TSource> _appends;
  139. private readonly Node<TSource> _prepends;
  140. public IScheduler Scheduler { get; }
  141. public Recursive(IObservable<TSource> source, Node<TSource> prepends, Node<TSource> appends, IScheduler scheduler)
  142. {
  143. _source = source;
  144. _appends = appends;
  145. _prepends = prepends;
  146. Scheduler = scheduler;
  147. }
  148. public IAppendPrepend Append(TSource value)
  149. {
  150. return new Recursive(_source,
  151. _prepends, new Node<TSource>(_appends, value), Scheduler);
  152. }
  153. public IAppendPrepend Prepend(TSource value)
  154. {
  155. return new Recursive(_source,
  156. new Node<TSource>(_prepends, value), _appends, Scheduler);
  157. }
  158. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  159. protected override void Run(_ sink) => sink.Run();
  160. // The sink is based on the sink of the ToObervalbe class and does basically
  161. // the same twice, once for the append list and once for the prepend list.
  162. // Inbetween it forwards the values of the source class.
  163. //
  164. internal sealed class _ : IdentitySink<TSource>
  165. {
  166. private readonly IObservable<TSource> _source;
  167. private readonly Node<TSource> _appends;
  168. private readonly IScheduler _scheduler;
  169. private Node<TSource> _currentPrependNode;
  170. private TSource[] _appendArray;
  171. private int _currentAppendIndex;
  172. private volatile bool _disposed;
  173. public _(Recursive parent, IObserver<TSource> observer)
  174. : base(observer)
  175. {
  176. _source = parent._source;
  177. _scheduler = parent.Scheduler;
  178. _currentPrependNode = parent._prepends;
  179. _appends = parent._appends;
  180. }
  181. public void Run()
  182. {
  183. if (_currentPrependNode == null)
  184. {
  185. SetUpstream(_source.SubscribeSafe(this));
  186. }
  187. else
  188. {
  189. //
  190. // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
  191. // is used to have PrependValues() bail out.
  192. //
  193. _scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler));
  194. }
  195. }
  196. public override void OnCompleted()
  197. {
  198. if (_appends == null)
  199. {
  200. ForwardOnCompleted();
  201. }
  202. else
  203. {
  204. _appendArray = _appends.ToReverseArray();
  205. //
  206. // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
  207. // is used to have `AppendValues` bail out.
  208. //
  209. _scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler));
  210. }
  211. }
  212. protected override void Dispose(bool disposing)
  213. {
  214. if (disposing)
  215. {
  216. _disposed = true;
  217. }
  218. base.Dispose(disposing);
  219. }
  220. private IDisposable PrependValues(IScheduler scheduler)
  221. {
  222. if (_disposed)
  223. {
  224. return Disposable.Empty;
  225. }
  226. var current = _currentPrependNode.Value;
  227. ForwardOnNext(current);
  228. _currentPrependNode = _currentPrependNode.Parent;
  229. if (_currentPrependNode == null)
  230. {
  231. SetUpstream(_source.SubscribeSafe(this));
  232. }
  233. else
  234. {
  235. //
  236. // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
  237. // is used to have PrependValues() bail out.
  238. //
  239. scheduler.Schedule(this, (innerScheduler, @this) => @this.PrependValues(innerScheduler));
  240. }
  241. return Disposable.Empty;
  242. }
  243. private IDisposable AppendValues(IScheduler scheduler)
  244. {
  245. if (_disposed)
  246. {
  247. return Disposable.Empty;
  248. }
  249. var current = _appendArray[_currentAppendIndex];
  250. ForwardOnNext(current);
  251. _currentAppendIndex++;
  252. if (_currentAppendIndex == _appendArray.Length)
  253. {
  254. ForwardOnCompleted();
  255. }
  256. else
  257. {
  258. //
  259. // We never allow the scheduled work to be cancelled. Instead, the _disposed flag
  260. // is used to have AppendValues() bail out.
  261. //
  262. scheduler.Schedule(this, (innerScheduler, @this) => @this.AppendValues(innerScheduler));
  263. }
  264. return Disposable.Empty;
  265. }
  266. }
  267. }
  268. private sealed class LongRunning : Producer<TSource, LongRunning._>, IAppendPrepend
  269. {
  270. private readonly IObservable<TSource> _source;
  271. private readonly Node<TSource> _appends;
  272. private readonly Node<TSource> _prepends;
  273. private readonly ISchedulerLongRunning _longRunningScheduler;
  274. public IScheduler Scheduler { get; }
  275. public LongRunning(IObservable<TSource> source, Node<TSource> prepends, Node<TSource> appends, IScheduler scheduler, ISchedulerLongRunning longRunningScheduler)
  276. {
  277. _source = source;
  278. _appends = appends;
  279. _prepends = prepends;
  280. Scheduler = scheduler;
  281. _longRunningScheduler = longRunningScheduler;
  282. }
  283. public IAppendPrepend Append(TSource value)
  284. {
  285. return new LongRunning(_source,
  286. _prepends, new Node<TSource>(_appends, value), Scheduler, _longRunningScheduler);
  287. }
  288. public IAppendPrepend Prepend(TSource value)
  289. {
  290. return new LongRunning(_source,
  291. new Node<TSource>(_prepends, value), _appends, Scheduler, _longRunningScheduler);
  292. }
  293. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  294. protected override void Run(_ sink) => sink.Run();
  295. // The sink is based on the sink of the ToObervalbe class and does basically
  296. // the same twice, once for the append list and once for the prepend list.
  297. // Inbetween it forwards the values of the source class.
  298. //
  299. internal sealed class _ : IdentitySink<TSource>
  300. {
  301. private readonly IObservable<TSource> _source;
  302. private readonly Node<TSource> _prepends;
  303. private readonly Node<TSource> _appends;
  304. private readonly ISchedulerLongRunning _scheduler;
  305. private IDisposable _schedulerDisposable;
  306. public _(LongRunning parent, IObserver<TSource> observer)
  307. : base(observer)
  308. {
  309. _source = parent._source;
  310. _scheduler = parent._longRunningScheduler;
  311. _prepends = parent._prepends;
  312. _appends = parent._appends;
  313. }
  314. public void Run()
  315. {
  316. if (_prepends == null)
  317. {
  318. SetUpstream(_source.SubscribeSafe(this));
  319. }
  320. else
  321. {
  322. var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.PrependValues(cancel));
  323. Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
  324. }
  325. }
  326. public override void OnCompleted()
  327. {
  328. if (_appends == null)
  329. {
  330. ForwardOnCompleted();
  331. }
  332. else
  333. {
  334. var disposable = _scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.AppendValues(cancel));
  335. Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
  336. }
  337. }
  338. protected override void Dispose(bool disposing)
  339. {
  340. if (disposing)
  341. {
  342. Disposable.Dispose(ref _schedulerDisposable);
  343. }
  344. base.Dispose(disposing);
  345. }
  346. private void PrependValues(ICancelable cancel)
  347. {
  348. var current = _prepends;
  349. while (!cancel.IsDisposed)
  350. {
  351. ForwardOnNext(current.Value);
  352. current = current.Parent;
  353. if (current == null)
  354. {
  355. SetUpstream(_source.SubscribeSafe(this));
  356. break;
  357. }
  358. }
  359. }
  360. private void AppendValues(ICancelable cancel)
  361. {
  362. var array = _appends.ToReverseArray();
  363. var i = 0;
  364. while (!cancel.IsDisposed)
  365. {
  366. ForwardOnNext(array[i]);
  367. i++;
  368. if (i == array.Length)
  369. {
  370. ForwardOnCompleted();
  371. break;
  372. }
  373. }
  374. }
  375. }
  376. }
  377. private sealed class Node<T>
  378. {
  379. public readonly Node<T> Parent;
  380. public readonly T Value;
  381. public readonly int Count;
  382. public Node(T value)
  383. : this(null, value)
  384. {
  385. }
  386. public Node(Node<T> parent, T value)
  387. {
  388. Parent = parent;
  389. Value = value;
  390. if (parent == null)
  391. {
  392. Count = 1;
  393. }
  394. else
  395. {
  396. if (parent.Count == int.MaxValue)
  397. {
  398. throw new NotSupportedException($"Consecutive appends or prepends with a count of more than int.MaxValue ({int.MaxValue}) are not supported.");
  399. }
  400. Count = parent.Count + 1;
  401. }
  402. }
  403. public T[] ToReverseArray()
  404. {
  405. var array = new T[Count];
  406. var current = this;
  407. for (var i = Count - 1; i >= 0; i--)
  408. {
  409. array[i] = current.Value;
  410. current = current.Parent;
  411. }
  412. return array;
  413. }
  414. }
  415. internal sealed class SingleImmediate : SingleBase<SingleImmediate._>
  416. {
  417. public override IScheduler Scheduler => ImmediateScheduler.Instance;
  418. public SingleImmediate(IObservable<TSource> source, TSource value, bool append)
  419. : base(source, value, append)
  420. {
  421. }
  422. protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
  423. protected override void Run(_ sink) => sink.Run();
  424. internal sealed class _ : IdentitySink<TSource>
  425. {
  426. private readonly IObservable<TSource> _source;
  427. private readonly TSource _value;
  428. private readonly bool _append;
  429. public _(SingleImmediate parent, IObserver<TSource> observer)
  430. : base(observer)
  431. {
  432. _source = parent._source;
  433. _value = parent._value;
  434. _append = parent._append;
  435. }
  436. public void Run()
  437. {
  438. if (!_append)
  439. {
  440. ForwardOnNext(_value);
  441. }
  442. Run(_source);
  443. }
  444. public override void OnCompleted()
  445. {
  446. if (_append)
  447. {
  448. ForwardOnNext(_value);
  449. }
  450. ForwardOnCompleted();
  451. }
  452. }
  453. }
  454. }
  455. }