123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System.Collections.Generic;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Threading;
- namespace System.Reactive
- {
- using System.Collections.Concurrent;
- using System.Diagnostics;
- internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
- {
- private volatile int _state = 0;
- private const int STOPPED = 0;
- private const int RUNNING = 1;
- private const int PENDING = 2;
- private const int FAULTED = 9;
- private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
- private volatile bool _failed;
- private volatile Exception _error;
- private volatile bool _completed;
- private readonly IObserver<T> _observer;
- private readonly IScheduler _scheduler;
- private readonly ISchedulerLongRunning _longRunning;
- private readonly SerialDisposable _disposable = new SerialDisposable();
- public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
- {
- _scheduler = scheduler;
- _observer = observer;
- _longRunning = _scheduler.AsLongRunning();
- if (_longRunning != null)
- {
- _dispatcherEvent = new SemaphoreSlim(0);
- _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());
- }
- }
- private readonly object _dispatcherInitGate = new object();
- private readonly SemaphoreSlim _dispatcherEvent;
- private readonly IDisposable _dispatcherEventRelease;
- private IDisposable _dispatcherJob;
- private void EnsureDispatcher()
- {
- if (_dispatcherJob == null)
- {
- lock (_dispatcherInitGate)
- {
- if (_dispatcherJob == null)
- {
- _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
- _disposable.Disposable = StableCompositeDisposable.Create
- (
- _dispatcherJob,
- _dispatcherEventRelease
- );
- }
- }
- }
- }
- private void Dispatch(ICancelable cancel)
- {
- while (true)
- {
- _dispatcherEvent.Wait();
- if (cancel.IsDisposed)
- return;
- var next = default(T);
- while (_queue.TryDequeue(out next))
- {
- try
- {
- _observer.OnNext(next);
- }
- catch
- {
- var nop = default(T);
- while (_queue.TryDequeue(out nop))
- ;
- throw;
- }
- _dispatcherEvent.Wait();
- if (cancel.IsDisposed)
- return;
- }
- if (_failed)
- {
- _observer.OnError(_error);
- Dispose();
- return;
- }
- if (_completed)
- {
- _observer.OnCompleted();
- Dispose();
- return;
- }
- }
- }
- public void EnsureActive() => EnsureActive(1);
- public void EnsureActive(int n)
- {
- if (_longRunning != null)
- {
- if (n > 0)
- {
- _dispatcherEvent.Release(n);
- }
- EnsureDispatcher();
- }
- else
- EnsureActiveSlow();
- }
- private void EnsureActiveSlow()
- {
- var isOwner = false;
- while (true)
- {
- var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
- if (old == STOPPED)
- {
- isOwner = true; // RUNNING
- break;
- }
- if (old == FAULTED)
- return;
- // If we find the consumer loop running, we transition to PENDING to handle
- // the case where the queue is seen empty by the consumer, making it transition
- // to the STOPPED state, but we inserted an item into the queue.
- //
- // C: _queue.TryDequeue == false (RUNNING)
- // ----------------------------------------------
- // P: _queue.Enqueue(...)
- // EnsureActive
- // Exchange(ref _state, RUNNING) == RUNNING
- // ----------------------------------------------
- // C: transition to STOPPED (STOPPED)
- //
- // In this case, P would believe C is running and not invoke the scheduler
- // using the isOwner flag.
- //
- // By introducing an intermediate PENDING state and using CAS in the consumer
- // to only transition to STOPPED in case we were still RUNNING, we can force
- // the consumer to reconsider the decision to transition to STOPPED. In that
- // case, the consumer loops again and re-reads from the queue and other state
- // fields. At least one bit of state will have changed because EnsureActive
- // should only be called after invocation of IObserver<T> methods that touch
- // this state.
- //
- if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
- break;
- }
- if (isOwner)
- {
- _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
- }
- }
- private void Run(object state, Action<object> recurse)
- {
- var next = default(T);
- while (!_queue.TryDequeue(out next))
- {
- if (_failed)
- {
- // Between transitioning to _failed and the queue check in the loop,
- // items could have been queued, so we can't stop yet. We don't spin
- // and immediately re-check the queue.
- //
- // C: _queue.TryDequeue == false
- // ----------------------------------------------
- // P: OnNext(...)
- // _queue.Enqueue(...) // Will get lost
- // P: OnError(...)
- // _failed = true
- // ----------------------------------------------
- // C: if (_failed)
- // _observer.OnError(...) // Lost an OnNext
- //
- if (!_queue.IsEmpty)
- continue;
- Interlocked.Exchange(ref _state, STOPPED);
- _observer.OnError(_error);
- Dispose();
- return;
- }
- if (_completed)
- {
- // Between transitioning to _completed and the queue check in the loop,
- // items could have been queued, so we can't stop yet. We don't spin
- // and immediately re-check the queue.
- //
- // C: _queue.TryDequeue == false
- // ----------------------------------------------
- // P: OnNext(...)
- // _queue.Enqueue(...) // Will get lost
- // P: OnCompleted(...)
- // _completed = true
- // ----------------------------------------------
- // C: if (_completed)
- // _observer.OnCompleted() // Lost an OnNext
- //
- if (!_queue.IsEmpty)
- continue;
- Interlocked.Exchange(ref _state, STOPPED);
- _observer.OnCompleted();
- Dispose();
- return;
- }
- var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
- if (old == RUNNING || old == FAULTED)
- return;
- Debug.Assert(old == PENDING);
- // The producer has put us in the PENDING state to prevent us from
- // transitioning to STOPPED, so we go RUNNING again and re-check our state.
- _state = RUNNING;
- }
- Interlocked.Exchange(ref _state, RUNNING);
- try
- {
- _observer.OnNext(next);
- }
- catch
- {
- Interlocked.Exchange(ref _state, FAULTED);
- var nop = default(T);
- while (_queue.TryDequeue(out nop))
- ;
- throw;
- }
- recurse(state);
- }
- protected override void OnNextCore(T value)
- {
- _queue.Enqueue(value);
- }
- protected override void OnErrorCore(Exception exception)
- {
- _error = exception;
- _failed = true;
- }
- protected override void OnCompletedCore()
- {
- _completed = true;
- }
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- if (disposing)
- {
- _disposable.Dispose();
- }
- }
- }
- internal sealed class ObserveOnObserver<T> : ScheduledObserver<T>
- {
- private IDisposable _run;
- public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer)
- : base(scheduler, observer)
- {
- }
- public void Run(IObservable<T> source)
- {
- Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
- }
- protected override void OnNextCore(T value)
- {
- base.OnNextCore(value);
- EnsureActive();
- }
- protected override void OnErrorCore(Exception exception)
- {
- base.OnErrorCore(exception);
- EnsureActive();
- }
- protected override void OnCompletedCore()
- {
- base.OnCompletedCore();
- EnsureActive();
- }
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- if (disposing)
- {
- Disposable.TryDispose(ref _run);
- }
- }
- }
- internal interface IScheduledObserver<T> : IObserver<T>, IDisposable
- {
- void EnsureActive();
- void EnsureActive(int count);
- }
- /// <summary>
- /// An ObserveOn operator implementation that uses lock-free
- /// techniques to signal events to the downstream.
- /// </summary>
- /// <typeparam name="T">The element type of the sequence.</typeparam>
- internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
- {
- readonly IObserver<T> downstream;
- readonly IScheduler scheduler;
- /// <summary>
- /// If not null, the <see cref="scheduler"/> supports
- /// long running tasks.
- /// </summary>
- readonly ISchedulerLongRunning longRunning;
- readonly ConcurrentQueue<T> queue;
- private IDisposable _run;
- /// <summary>
- /// The current task representing a running drain operation.
- /// </summary>
- IDisposable task;
- /// <summary>
- /// Indicates the work-in-progress state of this operator,
- /// zero means no work is currently being done.
- /// </summary>
- int wip;
- /// <summary>
- /// If true, the upstream has issued OnCompleted.
- /// </summary>
- bool done;
- /// <summary>
- /// If <see cref="done"/> is true and this is non-null, the upstream
- /// failed with an OnError.
- /// </summary>
- Exception error;
- /// <summary>
- /// Indicates a dispose has been requested.
- /// </summary>
- bool disposed;
- public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream)
- {
- this.downstream = downstream;
- this.scheduler = scheduler;
- this.longRunning = scheduler.AsLongRunning();
- this.queue = new ConcurrentQueue<T>();
- }
- public void Run(IObservable<T> source)
- {
- Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
- }
- public void Dispose()
- {
- Volatile.Write(ref disposed, true);
- Disposable.TryDispose(ref task);
- Disposable.TryDispose(ref _run);
- Clear();
- }
- /// <summary>
- /// Remove remaining elements from the queue upon
- /// cancellation or failure.
- /// </summary>
- void Clear()
- {
- var q = queue;
- while (q.TryDequeue(out var _)) ;
- }
- public void OnCompleted()
- {
- Volatile.Write(ref done, true);
- Schedule();
- }
- public void OnError(Exception error)
- {
- this.error = error;
- Volatile.Write(ref done, true);
- Schedule();
- }
- public void OnNext(T value)
- {
- queue.Enqueue(value);
- Schedule();
- }
- /// <summary>
- /// Submit the drain task via the appropriate scheduler if
- /// there is no drain currently running (wip > 0).
- /// </summary>
- void Schedule()
- {
- if (Interlocked.Increment(ref wip) == 1)
- {
- var newTask = new SingleAssignmentDisposable();
- if (Disposable.TrySetMultiple(ref task, newTask))
- {
- var longRunning = this.longRunning;
- if (longRunning != null)
- {
- newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
- }
- else
- {
- newTask.Disposable = scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
- }
- }
- // If there was a cancellation, clear the queue
- // of items. This doesn't have to be inside the
- // wip != 0 (exclusive) mode as the queue
- // is of a multi-consumer type.
- if (Volatile.Read(ref disposed))
- {
- Clear();
- }
- }
- }
- /// <summary>
- /// The static action to be scheduled on a long running scheduler.
- /// Avoids creating a delegate that captures <code>this</code>
- /// whenever the signals have to be drained.
- /// </summary>
- static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
- (self, cancel) => self.DrainLongRunning();
- /// <summary>
- /// The static action to be scheduled on a simple scheduler.
- /// Avoids creating a delegate that captures <code>this</code>
- /// whenever the signals have to be drained.
- /// </summary>
- static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
- (scheduler, self) => self.DrainShortRunning(scheduler);
- /// <summary>
- /// Emits at most one signal per run on a scheduler that doesn't like
- /// long running tasks.
- /// </summary>
- /// <param name="recursiveScheduler">The scheduler to use for scheduling the next signal emission if necessary.</param>
- /// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
- IDisposable DrainShortRunning(IScheduler recursiveScheduler)
- {
- DrainStep(queue, downstream, false);
- if (Interlocked.Decrement(ref wip) != 0)
- {
- return recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
- }
- return Disposable.Empty;
- }
- /// <summary>
- /// Executes a drain step by checking the disposed state,
- /// checking for the terminated state and for an
- /// empty queue, issuing the appropriate signals to the
- /// given downstream.
- /// </summary>
- /// <param name="q">The queue to use.</param>
- /// <param name="downstream">The intended consumer of the events.</param>
- /// <param name="delayError">Should the errors be delayed until all
- /// queued items have been emitted to the downstream?</param>
- /// <returns>True if the drain loop should stop.</returns>
- bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
- {
- // Check if the operator has been disposed
- if (Volatile.Read(ref disposed))
- {
- // cleanup residue items in the queue
- Clear();
- return true;
- }
- // Has the upstream call OnCompleted?
- var d = Volatile.Read(ref done);
- if (d && !delayError)
- {
- // done = true happens before setting error
- // this is safe to be a plain read
- var ex = error;
- // if not null, there was an OnError call
- if (ex != null)
- {
- Volatile.Write(ref disposed, true);
- downstream.OnError(ex);
- return true;
- }
- }
- // get the next item from the queue if any
- var empty = !queue.TryDequeue(out var v);
- // the upstream called OnComplete and the queue is empty
- // that means we are done, no further signals can happen
- if (d && empty)
- {
- Volatile.Write(ref disposed, true);
- // done = true happens before setting error
- // this is safe to be a plain read
- var ex = error;
- // if not null, there was an OnError call
- if (ex != null)
- {
- downstream.OnError(ex);
- }
- else
- {
- // otherwise, complete normally
- downstream.OnCompleted();
- }
- return true;
- }
- else
- // the queue is empty and the upstream hasn't completed yet
- if (empty)
- {
- return true;
- }
- // emit the item
- downstream.OnNext(v);
- // keep looping
- return false;
- }
- /// <summary>
- /// Emits as many signals as possible to the downstream observer
- /// as this is executing a long-running scheduler so
- /// it can occupy that thread as long as it needs to.
- /// </summary>
- void DrainLongRunning()
- {
- var missed = 1;
- // read out fields upfront as the DrainStep uses atomics
- // that would force the re-read of these constant values
- // from memory, regardless of readonly, afaik
- var q = queue;
- var downstream = this.downstream;
- for (; ; )
- {
- for (; ; )
- {
- // delayError: true - because of
- // ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
- // expects something that almost looks like full delayError
- if (DrainStep(q, downstream, true))
- {
- break;
- }
- }
- missed = Interlocked.Add(ref wip, -missed);
- if (missed == 0)
- {
- break;
- }
- }
- }
- }
- }
|