| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- namespace System.Reactive.Linq.ObservableImpl
- {
- internal static class AppendPrepend
- {
- internal interface IAppendPrepend<TSource> : IObservable<TSource>
- {
- IAppendPrepend<TSource> Append(TSource value);
- IAppendPrepend<TSource> Prepend(TSource value);
- IScheduler Scheduler { get; }
- }
- internal sealed class AppendPrependSingle<TSource> : Producer<TSource, AppendPrependSingle<TSource>._>, IAppendPrepend<TSource>
- {
- private readonly IObservable<TSource> _source;
- private readonly TSource _value;
- private readonly bool _append;
- public IScheduler Scheduler { get; }
- public AppendPrependSingle(IObservable<TSource> source, TSource value, IScheduler scheduler, bool append)
- {
- _source = source;
- _value = value;
- _append = append;
- Scheduler = scheduler;
- }
- public IAppendPrepend<TSource> Append(TSource value)
- {
- var prev = new Node<TSource>(_value);
- if (_append)
- {
- return new AppendPrependMultiple<TSource>(_source,
- null, new Node<TSource>(prev, value), Scheduler);
- }
- return new AppendPrependMultiple<TSource>(_source,
- prev, new Node<TSource>(value), Scheduler);
- }
- public IAppendPrepend<TSource> Prepend(TSource value)
- {
- var prev = new Node<TSource>(_value);
- if (_append)
- {
- return new AppendPrependMultiple<TSource>(_source,
- new Node<TSource>(value), prev, Scheduler);
- }
- return new AppendPrependMultiple<TSource>(_source,
- new Node<TSource>(prev, value), null, Scheduler);
- }
- protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
- protected override void Run(_ sink) => sink.Run();
- internal sealed class _ : IdentitySink<TSource>
- {
- private readonly IObservable<TSource> _source;
- private readonly TSource _value;
- private readonly IScheduler _scheduler;
- private readonly bool _append;
- private IDisposable _schedulerDisposable;
- public _(AppendPrependSingle<TSource> parent, IObserver<TSource> observer)
- : base(observer)
- {
- _source = parent._source;
- _value = parent._value;
- _scheduler = parent.Scheduler;
- _append = parent._append;
- }
- public void Run()
- {
- var disp = _append
- ? _source.SubscribeSafe(this)
- : _scheduler.ScheduleAction(this, PrependValue);
- SetUpstream(disp);
- }
- private static IDisposable PrependValue(_ sink)
- {
- sink.ForwardOnNext(sink._value);
- return sink._source.SubscribeSafe(sink);
- }
- public override void OnCompleted()
- {
- if (_append)
- {
- var disposable = _scheduler.ScheduleAction(this, AppendValue);
- Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
- }
- else
- {
- ForwardOnCompleted();
- }
- }
- private static void AppendValue(_ sink)
- {
- sink.ForwardOnNext(sink._value);
- sink.ForwardOnCompleted();
- }
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- Disposable.TryDispose(ref _schedulerDisposable);
- }
- base.Dispose(disposing);
- }
- }
- }
- private sealed class AppendPrependMultiple<TSource> : Producer<TSource, AppendPrependMultiple<TSource>._>, IAppendPrepend<TSource>
- {
- private readonly IObservable<TSource> _source;
- private readonly Node<TSource> _appends;
- private readonly Node<TSource> _prepends;
- public IScheduler Scheduler { get; }
- public AppendPrependMultiple(IObservable<TSource> source, Node<TSource> prepends, Node<TSource> appends, IScheduler scheduler)
- {
- _source = source;
- _appends = appends;
- _prepends = prepends;
- Scheduler = scheduler;
- }
- public IAppendPrepend<TSource> Append(TSource value)
- {
- return new AppendPrependMultiple<TSource>(_source,
- _prepends, new Node<TSource>(_appends, value), Scheduler);
- }
- public IAppendPrepend<TSource> Prepend(TSource value)
- {
- return new AppendPrependMultiple<TSource>(_source,
- new Node<TSource>(_prepends, value), _appends, Scheduler);
- }
- protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
- protected override void Run(_ sink) => sink.Run();
- // The sink is based on the sink of the ToObervalbe class and does basically
- // the same twice, once for the append list and once for the prepend list.
- // Inbetween it forwards the values of the source class.
- //
- internal sealed class _ : IdentitySink<TSource>
- {
- private readonly IObservable<TSource> _source;
- private readonly TSource[] _prepends;
- private readonly TSource[] _appends;
- private readonly IScheduler _scheduler;
- private IDisposable _schedulerDisposable;
- public _(AppendPrependMultiple<TSource> parent, IObserver<TSource> observer)
- : base(observer)
- {
- _source = parent._source;
- _scheduler = parent.Scheduler;
- if (parent._prepends != null)
- {
- _prepends = parent._prepends.ToArray();
- }
- if (parent._appends != null)
- {
- _appends = parent._appends.ToReverseArray();
- }
- }
- public void Run()
- {
- if (_prepends != null)
- {
- var disposable = Schedule(_prepends, s => s.SetUpstream(s._source.SubscribeSafe(s)));
- Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
- }
- else
- {
- SetUpstream(_source.SubscribeSafe(this));
- }
- }
- public override void OnCompleted()
- {
- if (_appends != null)
- {
- var disposable = Schedule(_appends, s => s.ForwardOnCompleted());
- Disposable.TrySetSerial(ref _schedulerDisposable, disposable);
- }
- else
- {
- ForwardOnCompleted();
- }
- }
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- Disposable.TryDispose(ref _schedulerDisposable);
- }
- base.Dispose(disposing);
- }
- private IDisposable Schedule(TSource[] array, Action<_> continueWith)
- {
- var longRunning = _scheduler.AsLongRunning();
- if (longRunning != null)
- {
- //
- // Long-running schedulers have the contract they should *never* prevent
- // the work from starting, such that the scheduled work has the chance
- // to observe the cancellation and perform proper clean-up. In this case,
- // we're sure Loop will be entered, allowing us to dispose the enumerator.
- //
- return longRunning.ScheduleLongRunning(new State(null, this, array, continueWith), Loop);
- }
- //
- // We never allow the scheduled work to be cancelled. Instead, the flag
- // is used to have LoopRec bail out and perform proper clean-up of the
- // enumerator.
- //
- var flag = new BooleanDisposable();
- _scheduler.Schedule(new State(flag, this, array, continueWith), LoopRec);
- return flag;
- }
- private struct State
- {
- public readonly _ _sink;
- public readonly ICancelable _flag;
- public readonly TSource[] _array;
- public readonly Action<_> _continue;
- public int _current;
- public State(ICancelable flag, _ sink, TSource[] array, Action<_> c)
- {
- _sink = sink;
- _flag = flag;
- _continue = c;
- _array = array;
- _current = 0;
- }
- }
- private void LoopRec(State state, Action<State> recurse)
- {
- if (state._flag.IsDisposed)
- {
- return;
- }
- var current = state._array[state._current];
- ForwardOnNext(current);
- state._current++;
- if (state._current == state._array.Length)
- {
- state._continue(state._sink);
- return;
- }
- recurse(state);
- }
- private void Loop(State state, ICancelable cancel)
- {
- var array = state._array;
- var i = 0;
- while (!cancel.IsDisposed)
- {
- ForwardOnNext(array[i]);
- i++;
- if (i == array.Length)
- {
- state._continue(state._sink);
- break;
- }
- }
- }
- }
- }
- private sealed class Node<T>
- {
- private readonly Node<T> _parent;
- private readonly T _value;
- private readonly int _count;
- public Node(T value)
- : this(null, value)
- {
- }
- public Node(Node<T> parent, T value)
- {
- _parent = parent;
- _value = value;
- if (parent == null)
- {
- _count = 1;
- }
- else
- {
- if (parent._count == int.MaxValue)
- {
- throw new NotSupportedException($"Consecutive appends or prepends with a count of more than int.MaxValue ({int.MaxValue}) are not supported.");
- }
- _count = parent._count + 1;
- }
- }
- public T[] ToArray()
- {
- var array = new T[_count];
- var current = this;
- for (var i = 0; i < _count; i++)
- {
- array[i] = current._value;
- current = current._parent;
- }
- return array;
- }
- public T[] ToReverseArray()
- {
- var array = new T[_count];
- var current = this;
- for (var i = _count - 1; i >= 0; i--)
- {
- array[i] = current._value;
- current = current._parent;
- }
- return array;
- }
- }
- internal sealed class AppendPrependSingleImmediate<TSource> : Producer<TSource, AppendPrependSingleImmediate<TSource>._>, IAppendPrepend<TSource>
- {
- private readonly IObservable<TSource> _source;
- private readonly TSource _value;
- private readonly bool _append;
- public IScheduler Scheduler { get { return ImmediateScheduler.Instance; } }
- public AppendPrependSingleImmediate(IObservable<TSource> source, TSource value, bool append)
- {
- _source = source;
- _value = value;
- _append = append;
- }
- public IAppendPrepend<TSource> Append(TSource value)
- {
- var prev = new Node<TSource>(_value);
- if (_append)
- {
- return new AppendPrependMultiple<TSource>(_source,
- null, new Node<TSource>(prev, value), Scheduler);
- }
- return new AppendPrependMultiple<TSource>(_source,
- prev, new Node<TSource>(value), Scheduler);
- }
- public IAppendPrepend<TSource> Prepend(TSource value)
- {
- var prev = new Node<TSource>(_value);
- if (_append)
- {
- return new AppendPrependMultiple<TSource>(_source,
- new Node<TSource>(value), prev, Scheduler);
- }
- return new AppendPrependMultiple<TSource>(_source,
- new Node<TSource>(prev, value), null, Scheduler);
- }
- protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
- protected override void Run(_ sink) => sink.Run();
- internal sealed class _ : IdentitySink<TSource>
- {
- private readonly IObservable<TSource> _source;
- private readonly TSource _value;
- private readonly bool _append;
- public _(AppendPrependSingleImmediate<TSource> parent, IObserver<TSource> observer)
- : base(observer)
- {
- _source = parent._source;
- _value = parent._value;
- _append = parent._append;
- }
- public void Run()
- {
- if (!_append)
- {
- ForwardOnNext(_value);
- }
- Run(_source);
- }
- public override void OnCompleted()
- {
- if (_append)
- {
- ForwardOnNext(_value);
- }
- ForwardOnCompleted();
- }
- }
- }
- }
- }
|