| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System.Collections.Generic;using System.Reactive.Concurrency;using System.Reactive.Disposables;using System.Threading;namespace System.Reactive{#if !NO_PERF && !NO_CDS    using System.Collections.Concurrent;    using System.Diagnostics;    internal class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>    {        private volatile int _state = 0;        private const int STOPPED = 0;        private const int RUNNING = 1;        private const int PENDING = 2;        private const int FAULTED = 9;        private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();        private volatile bool _failed;        private volatile Exception _error;        private volatile bool _completed;        private readonly IObserver<T> _observer;        private readonly IScheduler _scheduler;        private readonly ISchedulerLongRunning _longRunning;        private readonly SerialDisposable _disposable = new SerialDisposable();        public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)        {            _scheduler = scheduler;            _observer = observer;            _longRunning = _scheduler.AsLongRunning();            if (_longRunning != null)            {                _dispatcherEvent = new SemaphoreSlim(0);                _dispatcherEventRelease = Disposable.Create(() => _dispatcherEvent.Release());            }        }        private readonly object _dispatcherInitGate = new object();        private readonly SemaphoreSlim _dispatcherEvent;        private readonly IDisposable _dispatcherEventRelease;        private IDisposable _dispatcherJob;        private void EnsureDispatcher()        {            if (_dispatcherJob == null)            {                lock (_dispatcherInitGate)                {                    if (_dispatcherJob == null)                    {                        _dispatcherJob = _longRunning.ScheduleLongRunning(Dispatch);                        _disposable.Disposable = StableCompositeDisposable.Create                        (                            _dispatcherJob,                            _dispatcherEventRelease                        );                    }                }            }        }        private void Dispatch(ICancelable cancel)        {            while (true)            {                _dispatcherEvent.Wait();                if (cancel.IsDisposed)                    return;                var next = default(T);                while (_queue.TryDequeue(out next))                {                    try                    {                        _observer.OnNext(next);                    }                    catch                    {                        var nop = default(T);                        while (_queue.TryDequeue(out nop))                            ;                        throw;                    }                    _dispatcherEvent.Wait();                    if (cancel.IsDisposed)                        return;                }                if (_failed)                {                    _observer.OnError(_error);                    Dispose();                    return;                }                if (_completed)                {                    _observer.OnCompleted();                    Dispose();                    return;                }            }        }        public void EnsureActive()        {            EnsureActive(1);        }        public void EnsureActive(int n)        {            if (_longRunning != null)            {                if (n > 0)                    _dispatcherEvent.Release(n);                EnsureDispatcher();            }            else                EnsureActiveSlow();        }        private void EnsureActiveSlow()        {            var isOwner = false;#pragma warning disable 0420            while (true)            {                var old = Interlocked.CompareExchange(ref _state, RUNNING, STOPPED);                if (old == STOPPED)                {                    isOwner = true; // RUNNING                    break;                }                if (old == FAULTED)                    return;                // If we find the consumer loop running, we transition to PENDING to handle                // the case where the queue is seen empty by the consumer, making it transition                // to the STOPPED state, but we inserted an item into the queue.                //                // C: _queue.TryDequeue == false                  (RUNNING)                // ----------------------------------------------                // P: _queue.Enqueue(...)                //    EnsureActive                //      Exchange(ref _state, RUNNING) == RUNNING                // ----------------------------------------------                // C: transition to STOPPED                       (STOPPED)                //                // In this case, P would believe C is running and not invoke the scheduler                // using the isOwner flag.                //                // By introducing an intermediate PENDING state and using CAS in the consumer                // to only transition to STOPPED in case we were still RUNNING, we can force                // the consumer to reconsider the decision to transition to STOPPED. In that                // case, the consumer loops again and re-reads from the queue and other state                // fields. At least one bit of state will have changed because EnsureActive                // should only be called after invocation of IObserver<T> methods that touch                // this state.                //                if (old == PENDING || old == RUNNING && Interlocked.CompareExchange(ref _state, PENDING, RUNNING) == RUNNING)                    break;            }#pragma warning restore 0420            if (isOwner)            {                _disposable.Disposable = _scheduler.Schedule<object>(null, Run);            }        }        private void Run(object state, Action<object> recurse)        {#pragma warning disable 0420            var next = default(T);            while (!_queue.TryDequeue(out next))            {                if (_failed)                {                    // Between transitioning to _failed and the queue check in the loop,                    // items could have been queued, so we can't stop yet. We don't spin                    // and immediately re-check the queue.                    //                    // C: _queue.TryDequeue == false                    // ----------------------------------------------                    // P: OnNext(...)                    //      _queue.Enqueue(...)                        // Will get lost                    // P: OnError(...)                    //      _failed = true                    // ----------------------------------------------                    // C: if (_failed)                    //      _observer.OnError(...)                     // Lost an OnNext                    //                    if (!_queue.IsEmpty)                        continue;                    Interlocked.Exchange(ref _state, STOPPED);                    _observer.OnError(_error);                    Dispose();                    return;                }                if (_completed)                {                    // Between transitioning to _completed and the queue check in the loop,                    // items could have been queued, so we can't stop yet. We don't spin                    // and immediately re-check the queue.                    //                    // C: _queue.TryDequeue == false                    // ----------------------------------------------                    // P: OnNext(...)                    //      _queue.Enqueue(...)                        // Will get lost                    // P: OnCompleted(...)                    //      _completed = true                    // ----------------------------------------------                    // C: if (_completed)                    //      _observer.OnCompleted()                    // Lost an OnNext                    //                    if (!_queue.IsEmpty)                        continue;                    Interlocked.Exchange(ref _state, STOPPED);                    _observer.OnCompleted();                    Dispose();                    return;                }                var old = Interlocked.CompareExchange(ref _state, STOPPED, RUNNING);                if (old == RUNNING || old == FAULTED)                    return;                Debug.Assert(old == PENDING);                // The producer has put us in the PENDING state to prevent us from                // transitioning to STOPPED, so we go RUNNING again and re-check our state.                _state = RUNNING;            }            Interlocked.Exchange(ref _state, RUNNING);#pragma warning restore 0420            try            {                _observer.OnNext(next);            }            catch            {#pragma warning disable 0420                Interlocked.Exchange(ref _state, FAULTED);#pragma warning restore 0420                var nop = default(T);                while (_queue.TryDequeue(out nop))                    ;                throw;            }            recurse(state);        }        protected override void OnNextCore(T value)        {            _queue.Enqueue(value);        }        protected override void OnErrorCore(Exception exception)        {            _error = exception;            _failed = true;        }        protected override void OnCompletedCore()        {            _completed = true;        }        protected override void Dispose(bool disposing)        {            base.Dispose(disposing);            if (disposing)            {                _disposable.Dispose();            }        }    }#else    class ScheduledObserver<T> : ObserverBase<T>, IScheduledObserver<T>    {        private bool _isAcquired = false;        private bool _hasFaulted = false;        private readonly Queue<Action> _queue = new Queue<Action>();        private readonly IObserver<T> _observer;        private readonly IScheduler _scheduler;        private readonly SerialDisposable _disposable = new SerialDisposable();        public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)        {            _scheduler = scheduler;            _observer = observer;        }        public void EnsureActive(int n)        {            EnsureActive();        }        public void EnsureActive()        {            var isOwner = false;            lock (_queue)            {                if (!_hasFaulted && _queue.Count > 0)                {                    isOwner = !_isAcquired;                    _isAcquired = true;                }            }            if (isOwner)            {                _disposable.Disposable = _scheduler.Schedule<object>(null, Run);            }        }        private void Run(object state, Action<object> recurse)        {            var work = default(Action);            lock (_queue)            {                if (_queue.Count > 0)                    work = _queue.Dequeue();                else                {                    _isAcquired = false;                    return;                }            }            try            {                work();            }            catch            {                lock (_queue)                {                    _queue.Clear();                    _hasFaulted = true;                }                throw;            }            recurse(state);        }        protected override void OnNextCore(T value)        {            lock (_queue)                _queue.Enqueue(() => _observer.OnNext(value));        }        protected override void OnErrorCore(Exception exception)        {            lock (_queue)                _queue.Enqueue(() => _observer.OnError(exception));        }        protected override void OnCompletedCore()        {            lock (_queue)                _queue.Enqueue(() => _observer.OnCompleted());        }        protected override void Dispose(bool disposing)        {            base.Dispose(disposing);            if (disposing)            {                _disposable.Dispose();            }        }    }#endif    class ObserveOnObserver<T> : ScheduledObserver<T>    {        private IDisposable _cancel;        public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer, IDisposable cancel)            : base(scheduler, observer)        {            _cancel = cancel;        }        protected override void OnNextCore(T value)        {            base.OnNextCore(value);            EnsureActive();        }        protected override void OnErrorCore(Exception exception)        {            base.OnErrorCore(exception);            EnsureActive();        }        protected override void OnCompletedCore()        {            base.OnCompletedCore();            EnsureActive();        }        protected override void Dispose(bool disposing)        {            base.Dispose(disposing);            if (disposing)            {                var cancel = Interlocked.Exchange(ref _cancel, null);                if (cancel != null)                {                    cancel.Dispose();                }            }        }    }    interface IScheduledObserver<T> : IObserver<T>, IDisposable    {        void EnsureActive();        void EnsureActive(int count);    }}
 |