| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432 | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.#if !NO_PERFusing System;using System.Reactive.Concurrency;using System.Reactive.Disposables;namespace System.Reactive.Linq.ObservableImpl{    class Timeout<TSource> : Producer<TSource>    {        private readonly IObservable<TSource> _source;        private readonly TimeSpan? _dueTimeR;        private readonly DateTimeOffset? _dueTimeA;        private readonly IObservable<TSource> _other;        private readonly IScheduler _scheduler;        public Timeout(IObservable<TSource> source, TimeSpan dueTime, IObservable<TSource> other, IScheduler scheduler)        {            _source = source;            _dueTimeR = dueTime;            _other = other;            _scheduler = scheduler;        }        public Timeout(IObservable<TSource> source, DateTimeOffset dueTime, IObservable<TSource> other, IScheduler scheduler)        {            _source = source;            _dueTimeA = dueTime;            _other = other;            _scheduler = scheduler;        }        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_dueTimeA.HasValue)            {                var sink = new TimeA(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                var sink = new TimeR(this, observer, cancel);                setSink(sink);                return sink.Run();            }        }        class TimeA : Sink<TSource>, IObserver<TSource>        {            private readonly Timeout<TSource> _parent;            public TimeA(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private SerialDisposable _subscription;            private object _gate;            private bool _switched;            public IDisposable Run()            {                _subscription = new SerialDisposable();                var original = new SingleAssignmentDisposable();                _subscription.Disposable = original;                _gate = new object();                _switched = false;                var timer = _parent._scheduler.Schedule(_parent._dueTimeA.Value, Timeout);                original.Disposable = _parent._source.SubscribeSafe(this);                return StableCompositeDisposable.Create(_subscription, timer);            }            private void Timeout()            {                var timerWins = false;                lock (_gate)                {                    timerWins = !_switched;                    _switched = true;                }                if (timerWins)                    _subscription.Disposable = _parent._other.SubscribeSafe(this.GetForwarder());            }            public void OnNext(TSource value)            {                lock (_gate)                {                    if (!_switched)                        base._observer.OnNext(value);                }            }            public void OnError(Exception error)            {                var onErrorWins = false;                lock (_gate)                {                    onErrorWins = !_switched;                    _switched = true;                }                if (onErrorWins)                {                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                var onCompletedWins = false;                lock (_gate)                {                    onCompletedWins = !_switched;                    _switched = true;                }                if (onCompletedWins)                {                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        class TimeR : Sink<TSource>, IObserver<TSource>        {            private readonly Timeout<TSource> _parent;            public TimeR(Timeout<TSource> parent, IObserver<TSource> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private SerialDisposable _subscription;            private SerialDisposable _timer;            private object _gate;            private ulong _id;            private bool _switched;            public IDisposable Run()            {                _subscription = new SerialDisposable();                _timer = new SerialDisposable();                var original = new SingleAssignmentDisposable();                _subscription.Disposable = original;                _gate = new object();                _id = 0UL;                _switched = false;                CreateTimer();                original.Disposable = _parent._source.SubscribeSafe(this);                return StableCompositeDisposable.Create(_subscription, _timer);            }            private void CreateTimer()            {                _timer.Disposable = _parent._scheduler.Schedule(_id, _parent._dueTimeR.Value, Timeout);            }            private IDisposable Timeout(IScheduler _, ulong myid)            {                var timerWins = false;                lock (_gate)                {                    _switched = (_id == myid);                    timerWins = _switched;                }                if (timerWins)                    _subscription.Disposable = _parent._other.SubscribeSafe(this.GetForwarder());                return Disposable.Empty;            }            public void OnNext(TSource value)            {                var onNextWins = false;                lock (_gate)                {                    onNextWins = !_switched;                    if (onNextWins)                    {                        _id = unchecked(_id + 1);                    }                }                if (onNextWins)                {                    base._observer.OnNext(value);                    CreateTimer();                }            }            public void OnError(Exception error)            {                var onErrorWins = false;                lock (_gate)                {                    onErrorWins = !_switched;                    if (onErrorWins)                    {                        _id = unchecked(_id + 1);                    }                }                if (onErrorWins)                {                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                var onCompletedWins = false;                lock (_gate)                {                    onCompletedWins = !_switched;                    if (onCompletedWins)                    {                        _id = unchecked(_id + 1);                    }                }                if (onCompletedWins)                {                    base._observer.OnCompleted();                    base.Dispose();                }            }        }    }    class Timeout<TSource, TTimeout> : Producer<TSource>    {        private readonly IObservable<TSource> _source;        private readonly IObservable<TTimeout> _firstTimeout;        private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;        private readonly IObservable<TSource> _other;        public Timeout(IObservable<TSource> source, IObservable<TTimeout> firstTimeout, Func<TSource, IObservable<TTimeout>> timeoutSelector, IObservable<TSource> other)        {            _source = source;            _firstTimeout = firstTimeout;            _timeoutSelector = timeoutSelector;            _other = other;        }        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)        {            var sink = new _(this, observer, cancel);            setSink(sink);            return sink.Run();        }        class _ : Sink<TSource>, IObserver<TSource>        {            private readonly Timeout<TSource, TTimeout> _parent;            public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private SerialDisposable _subscription;            private SerialDisposable _timer;            private object _gate;            private ulong _id;            private bool _switched;            public IDisposable Run()            {                _subscription = new SerialDisposable();                _timer = new SerialDisposable();                var original = new SingleAssignmentDisposable();                _subscription.Disposable = original;                _gate = new object();                _id = 0UL;                _switched = false;                SetTimer(_parent._firstTimeout);                original.Disposable = _parent._source.SubscribeSafe(this);                return StableCompositeDisposable.Create(_subscription, _timer);            }            public void OnNext(TSource value)            {                if (ObserverWins())                {                    base._observer.OnNext(value);                    var timeout = default(IObservable<TTimeout>);                    try                    {                        timeout = _parent._timeoutSelector(value);                    }                    catch (Exception error)                    {                        base._observer.OnError(error);                        base.Dispose();                        return;                    }                    SetTimer(timeout);                }            }            public void OnError(Exception error)            {                if (ObserverWins())                {                    base._observer.OnError(error);                    base.Dispose();                }            }            public void OnCompleted()            {                if (ObserverWins())                {                    base._observer.OnCompleted();                    base.Dispose();                }            }            private void SetTimer(IObservable<TTimeout> timeout)            {                var myid = _id;                var d = new SingleAssignmentDisposable();                _timer.Disposable = d;                d.Disposable = timeout.SubscribeSafe(new TimeoutImpl(this, myid, d));            }            class TimeoutImpl : IObserver<TTimeout>            {                private readonly _ _parent;                private readonly ulong _id;                private readonly IDisposable _self;                public TimeoutImpl(_ parent, ulong id, IDisposable self)                {                    _parent = parent;                    _id = id;                    _self = self;                }                public void OnNext(TTimeout value)                {                    if (TimerWins())                        _parent._subscription.Disposable = _parent._parent._other.SubscribeSafe(_parent.GetForwarder());                    _self.Dispose();                }                public void OnError(Exception error)                {                    if (TimerWins())                    {                        _parent._observer.OnError(error);                        _parent.Dispose();                    }                }                public void OnCompleted()                {                    if (TimerWins())                        _parent._subscription.Disposable = _parent._parent._other.SubscribeSafe(_parent.GetForwarder());                }                private bool TimerWins()                {                    var res = false;                    lock (_parent._gate)                    {                        _parent._switched = (_parent._id == _id);                        res = _parent._switched;                    }                    return res;                }            }            private bool ObserverWins()            {                var res = false;                lock (_gate)                {                    res = !_switched;                    if (res)                    {                        _id = unchecked(_id + 1);                    }                }                return res;            }        }    }}#endif
 |