| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. #if !NO_PERFusing System;using System.Collections;using System.Collections.Generic;using System.Linq;using System.Reactive.Disposables;namespace System.Reactive.Linq.ObservableImpl{    #region Binary    class Zip<TFirst, TSecond, TResult> : Producer<TResult>    {        private readonly IObservable<TFirst> _first;        private readonly IObservable<TSecond> _second;        private readonly IEnumerable<TSecond> _secondE;        private readonly Func<TFirst, TSecond, TResult> _resultSelector;        public Zip(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)        {            _first = first;            _second = second;            _resultSelector = resultSelector;        }        public Zip(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)        {            _first = first;            _secondE = second;            _resultSelector = resultSelector;        }        protected override IDisposable Run(IObserver<TResult> observer, IDisposable cancel, Action<IDisposable> setSink)        {            if (_second != null)            {                var sink = new _(this, observer, cancel);                setSink(sink);                return sink.Run();            }            else            {                var sink = new ZipImpl(this, observer, cancel);                setSink(sink);                return sink.Run();            }        }        class _ : Sink<TResult>        {            private readonly Zip<TFirst, TSecond, TResult> _parent;            public _(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private object _gate;            public IDisposable Run()            {                _gate = new object();                var fstSubscription = new SingleAssignmentDisposable();                var sndSubscription = new SingleAssignmentDisposable();                var fstO = new F(this, fstSubscription);                var sndO = new S(this, sndSubscription);                fstO.Other = sndO;                sndO.Other = fstO;                fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO);                sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO);                return StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO);            }            class F : IObserver<TFirst>, IDisposable            {                private readonly _ _parent;                private readonly IDisposable _self;                private S _other;                private Queue<TFirst> _queue;                public F(_ parent, IDisposable self)                {                    _parent = parent;                    _self = self;                    _queue = new Queue<TFirst>();                }                public S Other { set { _other = value; } }                public Queue<TFirst> Queue { get { return _queue; } }                public bool Done { get; private set; }                public void OnNext(TFirst value)                {                    lock (_parent._gate)                    {                        if (_other.Queue.Count > 0)                        {                            var r = _other.Queue.Dequeue();                            var res = default(TResult);                            try                            {                                res = _parent._parent._resultSelector(value, r);                            }                            catch (Exception ex)                            {                                _parent._observer.OnError(ex);                                _parent.Dispose();                                return;                            }                            _parent._observer.OnNext(res);                        }                        else                        {                            if (_other.Done)                            {                                _parent._observer.OnCompleted();                                _parent.Dispose();                                return;                            }                            _queue.Enqueue(value);                        }                    }                }                public void OnError(Exception error)                {                    lock (_parent._gate)                    {                        _parent._observer.OnError(error);                        _parent.Dispose();                    }                }                public void OnCompleted()                {                    lock (_parent._gate)                    {                        Done = true;                        if (_other.Done)                        {                            _parent._observer.OnCompleted();                            _parent.Dispose();                            return;                        }                        else                        {                            _self.Dispose();                        }                    }                }                public void Dispose()                {                    _queue.Clear();                }            }            class S : IObserver<TSecond>, IDisposable            {                private readonly _ _parent;                private readonly IDisposable _self;                private F _other;                private Queue<TSecond> _queue;                public S(_ parent, IDisposable self)                {                    _parent = parent;                    _self = self;                    _queue = new Queue<TSecond>();                }                public F Other { set { _other = value; } }                public Queue<TSecond> Queue { get { return _queue; } }                public bool Done { get; private set; }                public void OnNext(TSecond value)                {                    lock (_parent._gate)                    {                        if (_other.Queue.Count > 0)                        {                            var l = _other.Queue.Dequeue();                            var res = default(TResult);                            try                            {                                res = _parent._parent._resultSelector(l, value);                            }                            catch (Exception ex)                            {                                _parent._observer.OnError(ex);                                _parent.Dispose();                                return;                            }                            _parent._observer.OnNext(res);                        }                        else                        {                            if (_other.Done)                            {                                _parent._observer.OnCompleted();                                _parent.Dispose();                                return;                            }                            _queue.Enqueue(value);                        }                    }                }                public void OnError(Exception error)                {                    lock (_parent._gate)                    {                        _parent._observer.OnError(error);                        _parent.Dispose();                    }                }                public void OnCompleted()                {                    lock (_parent._gate)                    {                        Done = true;                        if (_other.Done)                        {                            _parent._observer.OnCompleted();                            _parent.Dispose();                            return;                        }                        else                        {                            _self.Dispose();                        }                    }                }                public void Dispose()                {                    _queue.Clear();                }            }        }        class ZipImpl : Sink<TResult>, IObserver<TFirst>        {            private readonly Zip<TFirst, TSecond, TResult> _parent;            public ZipImpl(Zip<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private IEnumerator<TSecond> _rightEnumerator;            public IDisposable Run()            {                //                // Notice the evaluation order of obtaining the enumerator and subscribing to the                // observable sequence is reversed compared to the operator's signature. This is                // required to make sure the enumerator is available as soon as the observer can                // be called. Otherwise, we end up having a race for the initialization and use                // of the _rightEnumerator field.                //                try                {                    _rightEnumerator = _parent._secondE.GetEnumerator();                }                catch (Exception exception)                {                    base._observer.OnError(exception);                    base.Dispose();                    return Disposable.Empty;                }                var leftSubscription = _parent._first.SubscribeSafe(this);                return StableCompositeDisposable.Create(leftSubscription, _rightEnumerator);            }            public void OnNext(TFirst value)            {                var hasNext = false;                try                {                    hasNext = _rightEnumerator.MoveNext();                }                catch (Exception ex)                {                    base._observer.OnError(ex);                    base.Dispose();                    return;                }                if (hasNext)                {                    var right = default(TSecond);                    try                    {                        right = _rightEnumerator.Current;                    }                    catch (Exception ex)                    {                        base._observer.OnError(ex);                        base.Dispose();                        return;                    }                    TResult result;                    try                    {                        result = _parent._resultSelector(value, right);                    }                    catch (Exception ex)                    {                        base._observer.OnError(ex);                        base.Dispose();                        return;                    }                    base._observer.OnNext(result);                }                else                {                    base._observer.OnCompleted();                    base.Dispose();                }            }            public void OnError(Exception error)            {                base._observer.OnError(error);                base.Dispose();            }            public void OnCompleted()            {                base._observer.OnCompleted();                base.Dispose();            }        }    }    #endregion    #region [3,16]-ary    #region Helpers for n-ary overloads    interface IZip    {        void Next(int index);        void Fail(Exception error);        void Done(int index);    }    abstract class ZipSink<TResult> : Sink<TResult>, IZip    {        protected readonly object _gate;        private readonly ICollection[] _queues;        private readonly bool[] _isDone;        public ZipSink(int arity, IObserver<TResult> observer, IDisposable cancel)            : base(observer, cancel)        {            _gate = new object();            _isDone = new bool[arity];            _queues = new ICollection[arity];        }        public ICollection[] Queues        {            get { return _queues; }        }        public void Next(int index)        {            var hasValueAll = true;            foreach (var queue in _queues)            {                if (queue.Count == 0)                {                    hasValueAll = false;                    break;                }            }            if (hasValueAll)            {                var res = default(TResult);                try                {                    res = GetResult();                }                catch (Exception ex)                {                    base._observer.OnError(ex);                    base.Dispose();                    return;                }                base._observer.OnNext(res);            }            else            {                var allOthersDone = true;                for (int i = 0; i < _isDone.Length; i++)                {                    if (i != index && !_isDone[i])                    {                        allOthersDone = false;                        break;                    }                }                if (allOthersDone)                {                    base._observer.OnCompleted();                    base.Dispose();                }            }        }        protected abstract TResult GetResult();        public void Fail(Exception error)        {            base._observer.OnError(error);            base.Dispose();        }        public void Done(int index)        {            _isDone[index] = true;            var allDone = true;            foreach (var isDone in _isDone)            {                if (!isDone)                {                    allDone = false;                    break;                }            }            if (allDone)            {                base._observer.OnCompleted();                base.Dispose();                return;            }        }    }    class ZipObserver<T> : IObserver<T>    {        private readonly object _gate;        private readonly IZip _parent;        private readonly int _index;        private readonly IDisposable _self;        private readonly Queue<T> _values;        public ZipObserver(object gate, IZip parent, int index, IDisposable self)        {            _gate = gate;            _parent = parent;            _index = index;            _self = self;            _values = new Queue<T>();        }        public Queue<T> Values        {            get { return _values; }        }        public void OnNext(T value)        {            lock (_gate)            {                _values.Enqueue(value);                _parent.Next(_index);            }        }        public void OnError(Exception error)        {            _self.Dispose();            lock (_gate)            {                _parent.Fail(error);            }        }        public void OnCompleted()        {            _self.Dispose();            lock (_gate)            {                _parent.Done(_index);            }        }    }    #endregion    #endregion    #region N-ary    class Zip<TSource> : Producer<IList<TSource>>    {        private readonly IEnumerable<IObservable<TSource>> _sources;        public Zip(IEnumerable<IObservable<TSource>> sources)        {            _sources = sources;        }        protected override IDisposable Run(IObserver<IList<TSource>> observer, IDisposable cancel, Action<IDisposable> setSink)        {            var sink = new _(this, observer, cancel);            setSink(sink);            return sink.Run();        }        class _ : Sink<IList<TSource>>        {            private readonly Zip<TSource> _parent;            public _(Zip<TSource> parent, IObserver<IList<TSource>> observer, IDisposable cancel)                : base(observer, cancel)            {                _parent = parent;            }            private object _gate;            private Queue<TSource>[] _queues;            private bool[] _isDone;            private IDisposable[] _subscriptions;            public IDisposable Run()            {                var srcs = _parent._sources.ToArray();                var N = srcs.Length;                _queues = new Queue<TSource>[N];                for (int i = 0; i < N; i++)                    _queues[i] = new Queue<TSource>();                _isDone = new bool[N];                _subscriptions = new SingleAssignmentDisposable[N];                _gate = new object();                for (int i = 0; i < N; i++)                {                    var j = i;                    var d = new SingleAssignmentDisposable();                    _subscriptions[j] = d;                    var o = new O(this, j);                    d.Disposable = srcs[j].SubscribeSafe(o);                }                return new CompositeDisposable(_subscriptions) { Disposable.Create(() => { foreach (var q in _queues) q.Clear(); }) };            }            private void OnNext(int index, TSource value)            {                lock (_gate)                {                    _queues[index].Enqueue(value);                    if (_queues.All(q => q.Count > 0))                    {                        var res = _queues.Select(q => q.Dequeue()).ToList();                        base._observer.OnNext(res);                    }                    else if (_isDone.Where((x, i) => i != index).All(Stubs<bool>.I))                    {                        base._observer.OnCompleted();                        base.Dispose();                        return;                    }                }            }            private void OnError(Exception error)            {                lock (_gate)                {                    base._observer.OnError(error);                    base.Dispose();                }            }            private void OnCompleted(int index)            {                lock (_gate)                {                    _isDone[index] = true;                    if (_isDone.All(Stubs<bool>.I))                    {                        base._observer.OnCompleted();                        base.Dispose();                        return;                    }                    else                    {                        _subscriptions[index].Dispose();                    }                }            }            class O : IObserver<TSource>            {                private readonly _ _parent;                private readonly int _index;                public O(_ parent, int index)                {                    _parent = parent;                    _index = index;                }                public void OnNext(TSource value)                {                    _parent.OnNext(_index, value);                }                public void OnError(Exception error)                {                    _parent.OnError(error);                }                public void OnCompleted()                {                    _parent.OnCompleted(_index);                }            }        }    }    #endregion}#endif
 |