| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.using System;using System.Threading;namespace System.Reactive{    internal class CheckedObserver<T> : IObserver<T>    {        private readonly IObserver<T> _observer;        private int _state;        private const int IDLE = 0;        private const int BUSY = 1;        private const int DONE = 2;        public CheckedObserver(IObserver<T> observer)        {            _observer = observer;        }        public void OnNext(T value)        {            CheckAccess();            try            {                _observer.OnNext(value);            }            finally            {                Interlocked.Exchange(ref _state, IDLE);            }        }        public void OnError(Exception error)        {            CheckAccess();            try            {                _observer.OnError(error);            }            finally            {                Interlocked.Exchange(ref _state, DONE);            }        }        public void OnCompleted()        {            CheckAccess();            try            {                _observer.OnCompleted();            }            finally            {                Interlocked.Exchange(ref _state, DONE);            }        }        private void CheckAccess()        {            switch (Interlocked.CompareExchange(ref _state, BUSY, IDLE))            {                case BUSY:                    throw new InvalidOperationException(Strings_Core.REENTRANCY_DETECTED);                case DONE:                    throw new InvalidOperationException(Strings_Core.OBSERVER_TERMINATED);            }        }    }}
 |