소스 검색

Initial improvements for Delay layouts.

Bart De Smet 8 년 전
부모
커밋
f9cab0f2a1
2개의 변경된 파일153개의 추가작업 그리고 126개의 파일을 삭제
  1. 151 119
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs
  2. 2 7
      Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Time.cs

+ 151 - 119
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -545,184 +545,216 @@ namespace System.Reactive.Linq.ObservableImpl
         }
     }
 
-    internal sealed class Delay<TSource, TDelay> : Producer<TSource>
+    internal static class Delay<TSource, TDelay>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly IObservable<TDelay> _subscriptionDelay;
-        private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
-
-        public Delay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
-        {
-            _source = source;
-            _subscriptionDelay = subscriptionDelay;
-            _delaySelector = delaySelector;
-        }
-
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+        internal class Selector : Producer<TSource>
         {
-            var sink = new _(this, observer, cancel);
-            setSink(sink);
-            return sink.Run();
-        }
+            protected readonly IObservable<TSource> _source;
+            private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
 
-        class _ : Sink<TSource>, IObserver<TSource>
-        {
-            private readonly Delay<TSource, TDelay> _parent;
-
-            public _(Delay<TSource, TDelay> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Selector(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delaySelector)
             {
-                _parent = parent;
+                _source = source;
+                _delaySelector = delaySelector;
             }
 
-            private CompositeDisposable _delays;
-            private object _gate;
-            private bool _atEnd;
-            private SerialDisposable _subscription;
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+            {
+                var sink = new _<Selector>(this, observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
+            }
 
-            public IDisposable Run()
+            protected class _<TParent> : Sink<TSource>, IObserver<TSource>
+                where TParent : Selector
             {
-                _delays = new CompositeDisposable();
-                _gate = new object();
-                _atEnd = false;
-                _subscription = new SerialDisposable();
+                private readonly CompositeDisposable _delays = new CompositeDisposable();
+                private object _gate = new object();
 
-                if (_parent._subscriptionDelay == null)
+                private readonly Func<TSource, IObservable<TDelay>> _delaySelector;
+
+                public _(Selector parent, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    Start();
+                    _delaySelector = parent._delaySelector;
                 }
-                else
+
+                private bool _atEnd;
+                private IDisposable _subscription;
+
+                public IDisposable Run(TParent parent)
                 {
-                    _subscription.Disposable = _parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelay(this));
-                }
+                    _atEnd = false;
 
-                return StableCompositeDisposable.Create(_subscription, _delays);
-            }
+                    _subscription = RunCore(parent);
 
-            private void Start()
-            {
-                _subscription.Disposable = _parent._source.SubscribeSafe(this);
-            }
+                    return StableCompositeDisposable.Create(_subscription, _delays);
+                }
 
-            public void OnNext(TSource value)
-            {
-                var delay = default(IObservable<TDelay>);
-                try
+                protected virtual IDisposable RunCore(TParent parent)
                 {
-                    delay = _parent._delaySelector(value);
+                    return parent._source.SubscribeSafe(this);
                 }
-                catch (Exception error)
+
+                public void OnNext(TSource value)
+                {
+                    var delay = default(IObservable<TDelay>);
+                    try
+                    {
+                        delay = _delaySelector(value);
+                    }
+                    catch (Exception error)
+                    {
+                        lock (_gate)
+                        {
+                            base._observer.OnError(error);
+                            base.Dispose();
+                        }
+
+                        return;
+                    }
+
+                    var d = new SingleAssignmentDisposable();
+                    _delays.Add(d);
+                    d.Disposable = delay.SubscribeSafe(new DelayObserver(this, value, d));
+                }
+
+                public void OnError(Exception error)
                 {
                     lock (_gate)
                     {
                         base._observer.OnError(error);
                         base.Dispose();
                     }
-
-                    return;
                 }
 
-                var d = new SingleAssignmentDisposable();
-                _delays.Add(d);
-                d.Disposable = delay.SubscribeSafe(new Delta(this, value, d));
-            }
+                public void OnCompleted()
+                {
+                    lock (_gate)
+                    {
+                        _atEnd = true;
+                        _subscription.Dispose();
 
-            public void OnError(Exception error)
-            {
-                lock (_gate)
+                        CheckDone();
+                    }
+                }
+
+                private void CheckDone()
                 {
-                    base._observer.OnError(error);
-                    base.Dispose();
+                    if (_atEnd && _delays.Count == 0)
+                    {
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
                 }
-            }
 
-            public void OnCompleted()
-            {
-                lock (_gate)
+                private sealed class DelayObserver : IObserver<TDelay>
                 {
-                    _atEnd = true;
-                    _subscription.Dispose();
+                    private readonly _<TParent> _parent;
+                    private readonly TSource _value;
+                    private readonly IDisposable _self;
+
+                    public DelayObserver(_<TParent> parent, TSource value, IDisposable self)
+                    {
+                        _parent = parent;
+                        _value = value;
+                        _self = self;
+                    }
+
+                    public void OnNext(TDelay value)
+                    {
+                        lock (_parent._gate)
+                        {
+                            _parent._observer.OnNext(_value);
+
+                            _parent._delays.Remove(_self);
+                            _parent.CheckDone();
+                        }
+                    }
 
-                    CheckDone();
+                    public void OnError(Exception error)
+                    {
+                        lock (_parent._gate)
+                        {
+                            _parent._observer.OnError(error);
+                            _parent.Dispose();
+                        }
+                    }
+
+                    public void OnCompleted()
+                    {
+                        lock (_parent._gate)
+                        {
+                            _parent._observer.OnNext(_value);
+
+                            _parent._delays.Remove(_self);
+                            _parent.CheckDone();
+                        }
+                    }
                 }
             }
+        }
 
-            private void CheckDone()
+        internal sealed class SelectorWithSubscriptionDelay : Selector
+        {
+            private readonly IObservable<TDelay> _subscriptionDelay;
+
+            public SelectorWithSubscriptionDelay(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delaySelector)
+                : base(source, delaySelector)
             {
-                if (_atEnd && _delays.Count == 0)
-                {
-                    base._observer.OnCompleted();
-                    base.Dispose();
-                }
+                _subscriptionDelay = subscriptionDelay;
             }
 
-            class SubscriptionDelay : IObserver<TDelay>
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                private readonly _ _parent;
+                var sink = new _(this, observer, cancel);
+                setSink(sink);
+                return sink.Run(this);
+            }
 
-                public SubscriptionDelay(_ parent)
+            private sealed class _ : _<SelectorWithSubscriptionDelay>
+            {
+                public _(SelectorWithSubscriptionDelay parent, IObserver<TSource> observer, IDisposable cancel)
+                    : base(parent, observer, cancel)
                 {
-                    _parent = parent;
                 }
 
-                public void OnNext(TDelay value)
+                protected override IDisposable RunCore(SelectorWithSubscriptionDelay parent)
                 {
-                    _parent.Start();
-                }
+                    var subscription = new SerialDisposable();
 
-                public void OnError(Exception error)
-                {
-                    _parent._observer.OnError(error);
-                    _parent.Dispose();
-                }
+                    subscription.Disposable = parent._subscriptionDelay.SubscribeSafe(new SubscriptionDelayObserver(this, parent._source, subscription));
 
-                public void OnCompleted()
-                {
-                    _parent.Start();
+                    return subscription;
                 }
-            }
-
-            class Delta : IObserver<TDelay>
-            {
-                private readonly _ _parent;
-                private readonly TSource _value;
-                private readonly IDisposable _self;
 
-                public Delta(_ parent, TSource value, IDisposable self)
+                private sealed class SubscriptionDelayObserver : IObserver<TDelay>
                 {
-                    _parent = parent;
-                    _value = value;
-                    _self = self;
-                }
+                    private readonly _ _parent;
+                    private readonly IObservable<TSource> _source;
+                    private readonly SerialDisposable _subscription;
 
-                public void OnNext(TDelay value)
-                {
-                    lock (_parent._gate)
+                    public SubscriptionDelayObserver(_ parent, IObservable<TSource> source, SerialDisposable subscription)
                     {
-                        _parent._observer.OnNext(_value);
+                        _parent = parent;
+                        _source = source;
+                        _subscription = subscription;
+                    }
 
-                        _parent._delays.Remove(_self);
-                        _parent.CheckDone();
+                    public void OnNext(TDelay value)
+                    {
+                        _subscription.Disposable = _source.SubscribeSafe(_parent);
                     }
-                }
 
-                public void OnError(Exception error)
-                {
-                    lock (_parent._gate)
+                    public void OnError(Exception error)
                     {
                         _parent._observer.OnError(error);
                         _parent.Dispose();
                     }
-                }
 
-                public void OnCompleted()
-                {
-                    lock (_parent._gate)
+                    public void OnCompleted()
                     {
-                        _parent._observer.OnNext(_value);
-
-                        _parent._delays.Remove(_self);
-                        _parent.CheckDone();
+                        _subscription.Disposable = _source.SubscribeSafe(_parent);
                     }
                 }
             }

+ 2 - 7
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Time.cs

@@ -114,17 +114,12 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, Func<TSource, IObservable<TDelay>> delayDurationSelector)
         {
-            return Delay_<TSource, TDelay>(source, null, delayDurationSelector);
+            return new Delay<TSource, TDelay>.Selector(source, delayDurationSelector);
         }
 
         public virtual IObservable<TSource> Delay<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
         {
-            return Delay_<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
-        }
-
-        private static IObservable<TSource> Delay_<TSource, TDelay>(IObservable<TSource> source, IObservable<TDelay> subscriptionDelay, Func<TSource, IObservable<TDelay>> delayDurationSelector)
-        {
-            return new Delay<TSource, TDelay>(source, subscriptionDelay, delayDurationSelector);
+            return new Delay<TSource, TDelay>.SelectorWithSubscriptionDelay(source, subscriptionDelay, delayDurationSelector);
         }
 
         #endregion