| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Collections.Concurrent;using System.Reactive.Disposables;using System.Reactive.Subjects;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    internal sealed class RetryWhen<T, U> : IObservable<T>    {        private readonly IObservable<T> _source;        private readonly Func<IObservable<Exception>, IObservable<U>> _handler;        internal RetryWhen(IObservable<T> source, Func<IObservable<Exception>, IObservable<U>> handler)        {            _source = source;            _handler = handler;        }        public IDisposable Subscribe(IObserver<T> observer)        {            if (observer == null)            {                throw new ArgumentNullException(nameof(observer));            }            var errorSignals = new Subject<Exception>();            var redo = default(IObservable<U>);            try            {                redo = _handler(errorSignals);                if (redo == null)                {                    throw new NullReferenceException("The handler returned a null IObservable");                }            }            catch (Exception ex)            {                observer.OnError(ex);                return Disposable.Empty;            }            var parent = new MainObserver(observer, _source, new RedoSerializedObserver<Exception>(errorSignals));            var d = redo.SubscribeSafe(parent.HandlerConsumer);            Disposable.SetSingle(ref parent.HandlerUpstream, d);            parent.HandlerNext();            return parent;        }        private sealed class MainObserver : Sink<T>, IObserver<T>        {            private readonly IObserver<Exception> _errorSignal;            internal readonly HandlerObserver HandlerConsumer;            private readonly IObservable<T> _source;            private IDisposable _upstream;            internal IDisposable HandlerUpstream;            private int _trampoline;            private int _halfSerializer;            private Exception _error;            internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream)            {                _source = source;                _errorSignal = errorSignal;                HandlerConsumer = new HandlerObserver(this);            }            protected override void Dispose(bool disposing)            {                if (disposing)                {                    Disposable.TryDispose(ref _upstream);                    Disposable.TryDispose(ref HandlerUpstream);                }                base.Dispose(disposing);            }            public void OnCompleted()            {                HalfSerializer.ForwardOnCompleted(this, ref _halfSerializer, ref _error);            }            public void OnError(Exception error)            {                if (Disposable.TrySetSerial(ref _upstream, null))                {                    _errorSignal.OnNext(error);                }            }            public void OnNext(T value)            {                HalfSerializer.ForwardOnNext(this, value, ref _halfSerializer, ref _error);            }            private void HandlerError(Exception error)            {                HalfSerializer.ForwardOnError(this, error, ref _halfSerializer, ref _error);            }            private void HandlerComplete()            {                HalfSerializer.ForwardOnCompleted(this, ref _halfSerializer, ref _error);            }            internal void HandlerNext()            {                if (Interlocked.Increment(ref _trampoline) == 1)                {                    do                    {                        var sad = new SingleAssignmentDisposable();                        if (Disposable.TrySetSingle(ref _upstream, sad) != TrySetSingleResult.Success)                        {                            return;                        }                        sad.Disposable = _source.SubscribeSafe(this);                    }                    while (Interlocked.Decrement(ref _trampoline) != 0);                }            }            internal sealed class HandlerObserver : IObserver<U>            {                private readonly MainObserver _main;                internal HandlerObserver(MainObserver main)                {                    _main = main;                }                public void OnCompleted()                {                    _main.HandlerComplete();                }                public void OnError(Exception error)                {                    _main.HandlerError(error);                }                public void OnNext(U value)                {                    _main.HandlerNext();                }            }        }    }    internal sealed class RedoSerializedObserver<X> : IObserver<X>    {        private readonly IObserver<X> _downstream;        private int _wip;        private Exception _terminalException;        private static readonly Exception SignaledIndicator = new Exception();        private readonly ConcurrentQueue<X> _queue;        internal RedoSerializedObserver(IObserver<X> downstream)        {            _downstream = downstream;            _queue = new ConcurrentQueue<X>();        }        public void OnCompleted()        {            if (Interlocked.CompareExchange(ref _terminalException, ExceptionHelper.Terminated, null) == null)            {                Drain();            }        }        public void OnError(Exception error)        {            if (Interlocked.CompareExchange(ref _terminalException, error, null) == null)            {                Drain();            }        }        public void OnNext(X value)        {            _queue.Enqueue(value);            Drain();        }        private void Clear()        {            while (_queue.TryDequeue(out _))            {            }        }        private void Drain()        {            if (Interlocked.Increment(ref _wip) != 1)            {                return;            }            var missed = 1;            for (; ; )            {                var ex = Volatile.Read(ref _terminalException);                if (ex != null)                {                    if (ex != SignaledIndicator)                    {                        Interlocked.Exchange(ref _terminalException, SignaledIndicator);                        if (ex != ExceptionHelper.Terminated)                        {                            _downstream.OnError(ex);                        }                        else                        {                            _downstream.OnCompleted();                        }                    }                    Clear();                }                else                {                    while (_queue.TryDequeue(out var item))                    {                        _downstream.OnNext(item);                    }                }                missed = Interlocked.Add(ref _wip, -missed);                if (missed == 0)                {                    break;                }            }        }    }}
 |