Bart De Smet 8 лет назад
Родитель
Сommit
381904c9d2

+ 239 - 232
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

@@ -11,310 +11,302 @@ namespace System.Reactive.Linq.ObservableImpl
 {
     #region Binary
 
-    internal sealed class Zip<TFirst, TSecond, TResult> : Producer<TResult>
+    internal static class Zip<TFirst, TSecond, TResult>
     {
-        private readonly IObservable<TFirst> _first;
-        private readonly IObservable<TSecond> _second;
-        private readonly IEnumerable<TSecond> _secondE;
-        private readonly Func<TFirst, TSecond, TResult> _resultSelector;
-
-        public Zip(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
-        {
-            _first = first;
-            _second = second;
-            _resultSelector = resultSelector;
-        }
-
-        public Zip(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
+        internal sealed class Observable : Producer<TResult>
         {
-            _first = first;
-            _secondE = second;
-            _resultSelector = resultSelector;
-        }
+            private readonly IObservable<TFirst> _first;
+            private readonly IObservable<TSecond> _second;
+            private readonly Func<TFirst, TSecond, TResult> _resultSelector;
 
-        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_second != null)
+            public Observable(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
             {
-                var sink = new _(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
+                _first = first;
+                _second = second;
+                _resultSelector = resultSelector;
             }
-            else
+
+            protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                var sink = new ZipImpl(this, observer, cancel);
+                var sink = new _(_resultSelector, observer, cancel);
                 setSink(sink);
-                return sink.Run();
+                return sink.Run(_first, _second);
             }
-        }
-
-        class _ : Sink<TResult>
-        {
-            private readonly Zip<TFirst, TSecond, TResult> _parent;
 
-            public _(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
-                : base(observer, cancel)
+            private sealed class _ : Sink<TResult>
             {
-                _parent = parent;
-            }
+                private readonly Func<TFirst, TSecond, TResult> _resultSelector;
 
-            private object _gate;
+                public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _resultSelector = resultSelector;
+                }
 
-            public IDisposable Run()
-            {
-                _gate = new object();
+                private object _gate;
 
-                var fstSubscription = new SingleAssignmentDisposable();
-                var sndSubscription = new SingleAssignmentDisposable();
+                public IDisposable Run(IObservable<TFirst> first, IObservable<TSecond> second)
+                {
+                    _gate = new object();
 
-                var fstO = new F(this, fstSubscription);
-                var sndO = new S(this, sndSubscription);
+                    var fstSubscription = new SingleAssignmentDisposable();
+                    var sndSubscription = new SingleAssignmentDisposable();
 
-                fstO.Other = sndO;
-                sndO.Other = fstO;
+                    var fstO = new FirstObserver(this, fstSubscription);
+                    var sndO = new SecondObserver(this, sndSubscription);
 
-                fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);
-                sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);
+                    fstO.Other = sndO;
+                    sndO.Other = fstO;
 
-                return StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO);
-            }
+                    fstSubscription.Disposable = first.SubscribeSafe(fstO);
+                    sndSubscription.Disposable = second.SubscribeSafe(sndO);
 
-            class F : IObserver<TFirst>, IDisposable
-            {
-                private readonly _ _parent;
-                private readonly IDisposable _self;
-                private S _other;
-                private Queue<TFirst> _queue;
+                    return StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO);
+                }
 
-                public F(_ parent, IDisposable self)
+                private sealed class FirstObserver : IObserver<TFirst>, IDisposable
                 {
-                    _parent = parent;
-                    _self = self;
-                    _queue = new Queue<TFirst>();
-                }
+                    private readonly _ _parent;
+                    private readonly IDisposable _self;
+                    private SecondObserver _other;
+                    private Queue<TFirst> _queue;
 
-                public S Other { set { _other = value; } }
+                    public FirstObserver(_ parent, IDisposable self)
+                    {
+                        _parent = parent;
+                        _self = self;
+                        _queue = new Queue<TFirst>();
+                    }
 
-                public Queue<TFirst> Queue { get { return _queue; } }
-                public bool Done { get; private set; }
+                    public SecondObserver Other { set { _other = value; } }
 
-                public void OnNext(TFirst value)
-                {
-                    lock (_parent._gate)
+                    public Queue<TFirst> Queue => _queue;
+                    public bool Done { get; private set; }
+
+                    public void OnNext(TFirst value)
                     {
-                        if (_other.Queue.Count > 0)
+                        lock (_parent._gate)
                         {
-                            var r = _other.Queue.Dequeue();
-
-                            var res = default(TResult);
-                            try
+                            if (_other.Queue.Count > 0)
                             {
-                                res = _parent._parent._resultSelector(value, r);
+                                var r = _other.Queue.Dequeue();
+
+                                var res = default(TResult);
+                                try
+                                {
+                                    res = _parent._resultSelector(value, r);
+                                }
+                                catch (Exception ex)
+                                {
+                                    _parent._observer.OnError(ex);
+                                    _parent.Dispose();
+                                    return;
+                                }
+
+                                _parent._observer.OnNext(res);
                             }
-                            catch (Exception ex)
+                            else
                             {
-                                _parent._observer.OnError(ex);
-                                _parent.Dispose();
-                                return;
+                                if (_other.Done)
+                                {
+                                    _parent._observer.OnCompleted();
+                                    _parent.Dispose();
+                                    return;
+                                }
+
+                                _queue.Enqueue(value);
                             }
+                        }
+                    }
 
-                            _parent._observer.OnNext(res);
+                    public void OnError(Exception error)
+                    {
+                        lock (_parent._gate)
+                        {
+                            _parent._observer.OnError(error);
+                            _parent.Dispose();
                         }
-                        else
+                    }
+
+                    public void OnCompleted()
+                    {
+                        lock (_parent._gate)
                         {
+                            Done = true;
+
                             if (_other.Done)
                             {
                                 _parent._observer.OnCompleted();
                                 _parent.Dispose();
                                 return;
                             }
-
-                            _queue.Enqueue(value);
+                            else
+                            {
+                                _self.Dispose();
+                            }
                         }
                     }
-                }
 
-                public void OnError(Exception error)
-                {
-                    lock (_parent._gate)
+                    public void Dispose()
                     {
-                        _parent._observer.OnError(error);
-                        _parent.Dispose();
+                        _queue.Clear();
                     }
                 }
 
-                public void OnCompleted()
+                private sealed class SecondObserver : IObserver<TSecond>, IDisposable
                 {
-                    lock (_parent._gate)
-                    {
-                        Done = true;
+                    private readonly _ _parent;
+                    private readonly IDisposable _self;
+                    private FirstObserver _other;
+                    private Queue<TSecond> _queue;
 
-                        if (_other.Done)
-                        {
-                            _parent._observer.OnCompleted();
-                            _parent.Dispose();
-                            return;
-                        }
-                        else
-                        {
-                            _self.Dispose();
-                        }
+                    public SecondObserver(_ parent, IDisposable self)
+                    {
+                        _parent = parent;
+                        _self = self;
+                        _queue = new Queue<TSecond>();
                     }
-                }
 
-                public void Dispose()
-                {
-                    _queue.Clear();
-                }
-            }
+                    public FirstObserver Other { set { _other = value; } }
 
-            class S : IObserver<TSecond>, IDisposable
-            {
-                private readonly _ _parent;
-                private readonly IDisposable _self;
-                private F _other;
-                private Queue<TSecond> _queue;
-
-                public S(_ parent, IDisposable self)
-                {
-                    _parent = parent;
-                    _self = self;
-                    _queue = new Queue<TSecond>();
-                }
-
-                public F Other { set { _other = value; } }
+                    public Queue<TSecond> Queue => _queue;
+                    public bool Done { get; private set; }
 
-                public Queue<TSecond> Queue { get { return _queue; } }
-                public bool Done { get; private set; }
-
-                public void OnNext(TSecond value)
-                {
-                    lock (_parent._gate)
+                    public void OnNext(TSecond value)
                     {
-                        if (_other.Queue.Count > 0)
+                        lock (_parent._gate)
                         {
-                            var l = _other.Queue.Dequeue();
-
-                            var res = default(TResult);
-                            try
+                            if (_other.Queue.Count > 0)
                             {
-                                res = _parent._parent._resultSelector(l, value);
+                                var l = _other.Queue.Dequeue();
+
+                                var res = default(TResult);
+                                try
+                                {
+                                    res = _parent._resultSelector(l, value);
+                                }
+                                catch (Exception ex)
+                                {
+                                    _parent._observer.OnError(ex);
+                                    _parent.Dispose();
+                                    return;
+                                }
+
+                                _parent._observer.OnNext(res);
                             }
-                            catch (Exception ex)
+                            else
                             {
-                                _parent._observer.OnError(ex);
-                                _parent.Dispose();
-                                return;
+                                if (_other.Done)
+                                {
+                                    _parent._observer.OnCompleted();
+                                    _parent.Dispose();
+                                    return;
+                                }
+
+                                _queue.Enqueue(value);
                             }
+                        }
+                    }
 
-                            _parent._observer.OnNext(res);
+                    public void OnError(Exception error)
+                    {
+                        lock (_parent._gate)
+                        {
+                            _parent._observer.OnError(error);
+                            _parent.Dispose();
                         }
-                        else
+                    }
+
+                    public void OnCompleted()
+                    {
+                        lock (_parent._gate)
                         {
+                            Done = true;
+
                             if (_other.Done)
                             {
                                 _parent._observer.OnCompleted();
                                 _parent.Dispose();
                                 return;
                             }
-
-                            _queue.Enqueue(value);
+                            else
+                            {
+                                _self.Dispose();
+                            }
                         }
                     }
-                }
 
-                public void OnError(Exception error)
-                {
-                    lock (_parent._gate)
-                    {
-                        _parent._observer.OnError(error);
-                        _parent.Dispose();
-                    }
-                }
-
-                public void OnCompleted()
-                {
-                    lock (_parent._gate)
+                    public void Dispose()
                     {
-                        Done = true;
-
-                        if (_other.Done)
-                        {
-                            _parent._observer.OnCompleted();
-                            _parent.Dispose();
-                            return;
-                        }
-                        else
-                        {
-                            _self.Dispose();
-                        }
+                        _queue.Clear();
                     }
                 }
-
-                public void Dispose()
-                {
-                    _queue.Clear();
-                }
             }
         }
 
-        class ZipImpl : Sink<TResult>, IObserver<TFirst>
+        internal sealed class Enumerable : Producer<TResult>
         {
-            private readonly Zip<TFirst, TSecond, TResult> _parent;
+            private readonly IObservable<TFirst> _first;
+            private readonly IEnumerable<TSecond> _second;
+            private readonly Func<TFirst, TSecond, TResult> _resultSelector;
 
-            public ZipImpl(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Enumerable(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
             {
-                _parent = parent;
+                _first = first;
+                _second = second;
+                _resultSelector = resultSelector;
             }
 
-            private IEnumerator<TSecond> _rightEnumerator;
-
-            public IDisposable Run()
+            protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                //
-                // Notice the evaluation order of obtaining the enumerator and subscribing to the
-                // observable sequence is reversed compared to the operator's signature. This is
-                // required to make sure the enumerator is available as soon as the observer can
-                // be called. Otherwise, we end up having a race for the initialization and use
-                // of the _rightEnumerator field.
-                //
-                try
-                {
-                    _rightEnumerator = _parent._secondE.GetEnumerator();
-                }
-                catch (Exception exception)
-                {
-                    base._observer.OnError(exception);
-                    base.Dispose();
-                    return Disposable.Empty;
-                }
-
-                var leftSubscription = _parent._first.SubscribeSafe(this);
-
-                return StableCompositeDisposable.Create(leftSubscription, _rightEnumerator);
+                var sink = new _(_resultSelector, observer, cancel);
+                setSink(sink);
+                return sink.Run(_first, _second);
             }
 
-            public void OnNext(TFirst value)
+            private sealed class _ : Sink<TResult>, IObserver<TFirst>
             {
-                var hasNext = false;
-                try
+                private readonly Func<TFirst, TSecond, TResult> _resultSelector;
+
+                public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    hasNext = _rightEnumerator.MoveNext();
+                    _resultSelector = resultSelector;
                 }
-                catch (Exception ex)
+
+                private IEnumerator<TSecond> _rightEnumerator;
+
+                public IDisposable Run(IObservable<TFirst> first, IEnumerable<TSecond> second)
                 {
-                    base._observer.OnError(ex);
-                    base.Dispose();
-                    return;
+                    //
+                    // Notice the evaluation order of obtaining the enumerator and subscribing to the
+                    // observable sequence is reversed compared to the operator's signature. This is
+                    // required to make sure the enumerator is available as soon as the observer can
+                    // be called. Otherwise, we end up having a race for the initialization and use
+                    // of the _rightEnumerator field.
+                    //
+                    try
+                    {
+                        _rightEnumerator = second.GetEnumerator();
+                    }
+                    catch (Exception exception)
+                    {
+                        base._observer.OnError(exception);
+                        base.Dispose();
+                        return Disposable.Empty;
+                    }
+
+                    var leftSubscription = first.SubscribeSafe(this);
+
+                    return StableCompositeDisposable.Create(leftSubscription, _rightEnumerator);
                 }
 
-                if (hasNext)
+                public void OnNext(TFirst value)
                 {
-                    var right = default(TSecond);
+                    var hasNext = false;
                     try
                     {
-                        right = _rightEnumerator.Current;
+                        hasNext = _rightEnumerator.MoveNext();
                     }
                     catch (Exception ex)
                     {
@@ -323,38 +315,53 @@ namespace System.Reactive.Linq.ObservableImpl
                         return;
                     }
 
-                    TResult result;
-                    try
+                    if (hasNext)
                     {
-                        result = _parent._resultSelector(value, right);
+                        var right = default(TSecond);
+                        try
+                        {
+                            right = _rightEnumerator.Current;
+                        }
+                        catch (Exception ex)
+                        {
+                            base._observer.OnError(ex);
+                            base.Dispose();
+                            return;
+                        }
+
+                        TResult result;
+                        try
+                        {
+                            result = _resultSelector(value, right);
+                        }
+                        catch (Exception ex)
+                        {
+                            base._observer.OnError(ex);
+                            base.Dispose();
+                            return;
+                        }
+
+                        base._observer.OnNext(result);
                     }
-                    catch (Exception ex)
+                    else
                     {
-                        base._observer.OnError(ex);
+                        base._observer.OnCompleted();
                         base.Dispose();
-                        return;
                     }
+                }
 
-                    base._observer.OnNext(result);
+                public void OnError(Exception error)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
                 }
-                else
+
+                public void OnCompleted()
                 {
                     base._observer.OnCompleted();
                     base.Dispose();
                 }
             }
-
-            public void OnError(Exception error)
-            {
-                base._observer.OnError(error);
-                base.Dispose();
-            }
-
-            public void OnCompleted()
-            {
-                base._observer.OnCompleted();
-                base.Dispose();
-            }
         }
     }
 

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs

@@ -401,7 +401,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
         {
-            return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
+            return new Zip<TFirst, TSecond, TResult>.Observable(first, second, resultSelector);
         }
 
         public virtual IObservable<TResult> Zip<TSource, TResult>(IEnumerable<IObservable<TSource>> sources, Func<IList<TSource>, TResult> resultSelector)
@@ -502,7 +502,7 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Zip<TFirst, TSecond, TResult>(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)
         {
-            return new Zip<TFirst, TSecond, TResult>(first, second, resultSelector);
+            return new Zip<TFirst, TSecond, TResult>.Enumerable(first, second, resultSelector);
         }
 
         #endregion