123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 |
- // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
- using System.Collections.Generic;
- using System.Reactive.Concurrency;
- using System.Reactive.Disposables;
- using System.Threading;
- namespace System.Reactive
- {
- #if !NO_PERF && !NO_CDS
- using System.Collections.Concurrent;
- using System.Diagnostics;
- internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>
- {
- private volatile int _state = 0;
- private const int STOPPED = 0;
- private const int RUNNING = 1;
- private const int PENDING = 2;
- private const int FAULTED = 9;
- private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
- private volatile bool _failed;
- private volatile Exception _error;
- private volatile bool _completed;
- private readonly IObserver<T> _observer;
- private readonly IScheduler _scheduler;
- private readonly ISchedulerLongRunning _longRunning;
- private readonly SerialDisposable _disposable = new SerialDisposable();
- public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
- {
- _scheduler = scheduler;
- _observer = observer;
- _longRunning = _scheduler.AsLongRunning();
- if (_longRunning != null)
- {
- _dispatcherEvent = new SemaphoreSlim(0);
- _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());
- }
- }
- private readonly object _dispatcherInitGate = new object();
- private readonly SemaphoreSlim _dispatcherEvent;
- private readonly IDisposable _dispatcherEventRelease;
- private IDisposable _dispatcherJob;
- private void EnsureDispatcher()
- {
- if (_dispatcherJob == null)
- {
- lock (_dispatcherInitGate)
- {
- if (_dispatcherJob == null)
- {
- _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);
- _disposable.Disposable = StableCompositeDisposable.Create
- (
- _dispatcherJob,
- _dispatcherEventRelease
- );
- }
- }
- }
- }
- private void Dispatch(ICancelable cancel)
- {
- while (true)
- {
- _dispatcherEvent.Wait();
- if (cancel.IsDisposed)
- return;
- var next = default(T);
- while (_queue.TryDequeue(out next))
- {
- try
- {
- _observer.OnNext(next);
- }
- catch
- {
- var nop = default(T);
- while (_queue.TryDequeue(out nop))
- ;
- throw;
- }
- _dispatcherEvent.Wait();
- if (cancel.IsDisposed)
- return;
- }
- if (_failed)
- {
- _observer.OnError(_error);
- Dispose();
- return;
- }
- if (_completed)
- {
- _observer.OnCompleted();
- Dispose();
- return;
- }
- }
- }
- public void EnsureActive()
- {
- EnsureActive(1);
- }
- public void EnsureActive(int n)
- {
- if (_longRunning != null)
- {
- if (n > 0)
- _dispatcherEvent.Release(n);
- EnsureDispatcher();
- }
- else
- EnsureActiveSlow();
- }
- private void EnsureActiveSlow()
- {
- var isOwner = false;
- #pragma warning disable 0420
- while (true)
- {
- var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);
- if (old == STOPPED)
- {
- isOwner = true; // RUNNING
- break;
- }
- if (old == FAULTED)
- return;
- // If we find the consumer loop running, we transition to PENDING to handle
- // the case where the queue is seen empty by the consumer, making it transition
- // to the STOPPED state, but we inserted an item into the queue.
- //
- // C: _queue.TryDequeue == false (RUNNING)
- // ----------------------------------------------
- // P: _queue.Enqueue(...)
- // EnsureActive
- // Exchange(ref _state, RUNNING) == RUNNING
- // ----------------------------------------------
- // C: transition to STOPPED (STOPPED)
- //
- // In this case, P would believe C is running and not invoke the scheduler
- // using the isOwner flag.
- //
- // By introducing an intermediate PENDING state and using CAS in the consumer
- // to only transition to STOPPED in case we were still RUNNING, we can force
- // the consumer to reconsider the decision to transition to STOPPED. In that
- // case, the consumer loops again and re-reads from the queue and other state
- // fields. At least one bit of state will have changed because EnsureActive
- // should only be called after invocation of IObserver<T> methods that touch
- // this state.
- //
- if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)
- break;
- }
- #pragma warning restore 0420
- if (isOwner)
- {
- _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
- }
- }
- private void Run(object state, Action<object> recurse)
- {
- #pragma warning disable 0420
- var next = default(T);
- while (!_queue.TryDequeue(out next))
- {
- if (_failed)
- {
- // Between transitioning to _failed and the queue check in the loop,
- // items could have been queued, so we can't stop yet. We don't spin
- // and immediately re-check the queue.
- //
- // C: _queue.TryDequeue == false
- // ----------------------------------------------
- // P: OnNext(...)
- // _queue.Enqueue(...) // Will get lost
- // P: OnError(...)
- // _failed = true
- // ----------------------------------------------
- // C: if (_failed)
- // _observer.OnError(...) // Lost an OnNext
- //
- if (!_queue.IsEmpty)
- continue;
- Interlocked.Exchange(ref _state, STOPPED);
- _observer.OnError(_error);
- Dispose();
- return;
- }
- if (_completed)
- {
- // Between transitioning to _completed and the queue check in the loop,
- // items could have been queued, so we can't stop yet. We don't spin
- // and immediately re-check the queue.
- //
- // C: _queue.TryDequeue == false
- // ----------------------------------------------
- // P: OnNext(...)
- // _queue.Enqueue(...) // Will get lost
- // P: OnCompleted(...)
- // _completed = true
- // ----------------------------------------------
- // C: if (_completed)
- // _observer.OnCompleted() // Lost an OnNext
- //
- if (!_queue.IsEmpty)
- continue;
- Interlocked.Exchange(ref _state, STOPPED);
- _observer.OnCompleted();
- Dispose();
- return;
- }
- var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);
- if (old == RUNNING || old == FAULTED)
- return;
- Debug.Assert(old == PENDING);
- // The producer has put us in the PENDING state to prevent us from
- // transitioning to STOPPED, so we go RUNNING again and re-check our state.
- _state = RUNNING;
- }
- Interlocked.Exchange(ref _state, RUNNING);
- #pragma warning restore 0420
- try
- {
- _observer.OnNext(next);
- }
- catch
- {
- #pragma warning disable 0420
- Interlocked.Exchange(ref _state, FAULTED);
- #pragma warning restore 0420
- var nop = default(T);
- while (_queue.TryDequeue(out nop))
- ;
- throw;
- }
- recurse(state);
- }
- protected override void OnNextCore(T value)
- {
- _queue.Enqueue(value);
- }
- protected override void OnErrorCore(Exception exception)
- {
- _error = exception;
- _failed = true;
- }
- protected override void OnCompletedCore()
- {
- _completed = true;
- }
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- if (disposing)
- {
- _disposable.Dispose();
- }
- }
- }
- #else
- class ScheduledObserver<T> : ObserverBase<T>, IDisposable
- {
- private bool _isAcquired = false;
- private bool _hasFaulted = false;
- private readonly Queue<Action> _queue = new Queue<Action>();
- private readonly IObserver<T> _observer;
- private readonly IScheduler _scheduler;
- private readonly SerialDisposable _disposable = new SerialDisposable();
- public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
- {
- _scheduler = scheduler;
- _observer = observer;
- }
- public void EnsureActive(int n)
- {
- EnsureActive();
- }
- public void EnsureActive()
- {
- var isOwner = false;
- lock (_queue)
- {
- if (!_hasFaulted && _queue.Count > 0)
- {
- isOwner = !_isAcquired;
- _isAcquired = true;
- }
- }
- if (isOwner)
- {
- _disposable.Disposable = _scheduler.Schedule<object>(null, Run);
- }
- }
- private void Run(object state, Action<object> recurse)
- {
- var work = default(Action);
- lock (_queue)
- {
- if (_queue.Count > 0)
- work = _queue.Dequeue();
- else
- {
- _isAcquired = false;
- return;
- }
- }
- try
- {
- work();
- }
- catch
- {
- lock (_queue)
- {
- _queue.Clear();
- _hasFaulted = true;
- }
- throw;
- }
- recurse(state);
- }
- protected override void OnNextCore(T value)
- {
- lock (_queue)
- _queue.Enqueue(() => _observer.OnNext(value));
- }
- protected override void OnErrorCore(Exception exception)
- {
- lock (_queue)
- _queue.Enqueue(() => _observer.OnError(exception));
- }
- protected override void OnCompletedCore()
- {
- lock (_queue)
- _queue.Enqueue(() => _observer.OnCompleted());
- }
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- if (disposing)
- {
- _disposable.Dispose();
- }
- }
- }
- #endif
- class ObserveOnObserver<T> : ScheduledObserver<T>
- {
- private IDisposable _cancel;
- public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)
- : base(scheduler, observer)
- {
- _cancel = cancel;
- }
- protected override void OnNextCore(T value)
- {
- base.OnNextCore(value);
- EnsureActive();
- }
- protected override void OnErrorCore(Exception exception)
- {
- base.OnErrorCore(exception);
- EnsureActive();
- }
- protected override void OnCompletedCore()
- {
- base.OnCompletedCore();
- EnsureActive();
- }
- protected override void Dispose(bool disposing)
- {
- base.Dispose(disposing);
- if (disposing)
- {
- var cancel = Interlocked.Exchange(ref _cancel, null);
- if (cancel != null)
- {
- cancel.Dispose();
- }
- }
- }
- }
- interface IScheduledObserver<T> : IObserver<T>, IDisposable
- {
- void EnsureActive();
- void EnsureActive(int count);
- }
- }
|