| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the Apache 2.0 License.// See the LICENSE file in the project root for more information. using System.Collections;using System.Collections.Generic;using System.Linq;using System.Reactive.Disposables;using System.Threading;namespace System.Reactive.Linq.ObservableImpl{    #region Binary    internal static class Zip<TFirst, TSecond, TResult>    {        internal sealed class Observable : Producer<TResult, Observable._>        {            private readonly IObservable<TFirst> _first;            private readonly IObservable<TSecond> _second;            private readonly Func<TFirst, TSecond, TResult> _resultSelector;            public Observable(IObservable<TFirst> first, IObservable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)            {                _first = first;                _second = second;                _resultSelector = resultSelector;            }            protected override _ CreateSink(IObserver<TResult> observer) => new _(_resultSelector, observer);            protected override void Run(_ sink) => sink.Run(_first, _second);            internal sealed class _ : IdentitySink<TResult>            {                private readonly Func<TFirst, TSecond, TResult> _resultSelector;                private readonly object _gate;                private readonly FirstObserver _firstObserver;                private IDisposable _firstDisposable;                private readonly SecondObserver _secondObserver;                private IDisposable _secondDisposable;                public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)                    : base(observer)                {                    _gate = new object();                    _firstObserver = new FirstObserver(this);                    _secondObserver = new SecondObserver(this);                    _firstObserver.Other = _secondObserver;                    _secondObserver.Other = _firstObserver;                    _resultSelector = resultSelector;                }                public void Run(IObservable<TFirst> first, IObservable<TSecond> second)                {                    Disposable.SetSingle(ref _firstDisposable, first.SubscribeSafe(_firstObserver));                    Disposable.SetSingle(ref _secondDisposable, second.SubscribeSafe(_secondObserver));                }                protected override void Dispose(bool disposing)                {                    if (disposing)                    {                        Disposable.TryDispose(ref _firstDisposable);                        Disposable.TryDispose(ref _secondDisposable);                        // clearing the queue should happen under the lock                        // as they are plain Queue<T>s, not concurrent queues.                        lock (_gate)                        {                            _firstObserver.Dispose();                            _secondObserver.Dispose();                        }                    }                    base.Dispose(disposing);                }                private sealed class FirstObserver : IObserver<TFirst>, IDisposable                {                    private readonly _ _parent;                    private SecondObserver _other;                    private Queue<TFirst> _queue;                    public FirstObserver(_ parent)                    {                        _parent = parent;                        _queue = new Queue<TFirst>();                    }                    public SecondObserver Other { set { _other = value; } }                    public Queue<TFirst> Queue => _queue;                    public bool Done { get; private set; }                    public void OnNext(TFirst value)                    {                        lock (_parent._gate)                        {                            if (_other.Queue.Count > 0)                            {                                var r = _other.Queue.Dequeue();                                var res = default(TResult);                                try                                {                                    res = _parent._resultSelector(value, r);                                }                                catch (Exception ex)                                {                                    _parent.ForwardOnError(ex);                                    return;                                }                                _parent.ForwardOnNext(res);                            }                            else                            {                                if (_other.Done)                                {                                    _parent.ForwardOnCompleted();                                    return;                                }                                _queue.Enqueue(value);                            }                        }                    }                    public void OnError(Exception error)                    {                        lock (_parent._gate)                        {                            _parent.ForwardOnError(error);                        }                    }                    public void OnCompleted()                    {                        lock (_parent._gate)                        {                            Done = true;                            if (_other.Done)                            {                                _parent.ForwardOnCompleted();                                return;                            }                            else                            {                                Disposable.TryDispose(ref _parent._firstDisposable);                            }                        }                    }                    public void Dispose()                    {                        _queue.Clear();                    }                }                private sealed class SecondObserver : IObserver<TSecond>, IDisposable                {                    private readonly _ _parent;                    private FirstObserver _other;                    private Queue<TSecond> _queue;                    public SecondObserver(_ parent)                    {                        _parent = parent;                        _queue = new Queue<TSecond>();                    }                    public FirstObserver Other { set { _other = value; } }                    public Queue<TSecond> Queue => _queue;                    public bool Done { get; private set; }                    public void OnNext(TSecond value)                    {                        lock (_parent._gate)                        {                            if (_other.Queue.Count > 0)                            {                                var l = _other.Queue.Dequeue();                                var res = default(TResult);                                try                                {                                    res = _parent._resultSelector(l, value);                                }                                catch (Exception ex)                                {                                    _parent.ForwardOnError(ex);                                    return;                                }                                _parent.ForwardOnNext(res);                            }                            else                            {                                if (_other.Done)                                {                                    _parent.ForwardOnCompleted();                                    return;                                }                                _queue.Enqueue(value);                            }                        }                    }                    public void OnError(Exception error)                    {                        lock (_parent._gate)                        {                            _parent.ForwardOnError(error);                        }                    }                    public void OnCompleted()                    {                        lock (_parent._gate)                        {                            Done = true;                            if (_other.Done)                            {                                _parent.ForwardOnCompleted();                                return;                            }                            else                            {                                Disposable.TryDispose(ref _parent._secondDisposable);                            }                        }                    }                    public void Dispose()                    {                        _queue.Clear();                    }                }            }        }        internal sealed class Enumerable : Producer<TResult, Enumerable._>        {            private readonly IObservable<TFirst> _first;            private readonly IEnumerable<TSecond> _second;            private readonly Func<TFirst, TSecond, TResult> _resultSelector;            public Enumerable(IObservable<TFirst> first, IEnumerable<TSecond> second, Func<TFirst, TSecond, TResult> resultSelector)            {                _first = first;                _second = second;                _resultSelector = resultSelector;            }            protected override _ CreateSink(IObserver<TResult> observer) => new _(_resultSelector, observer);            protected override void Run(_ sink) => sink.Run(_first, _second);            internal sealed class _ : Sink<TFirst, TResult>             {                private readonly Func<TFirst, TSecond, TResult> _resultSelector;                public _(Func<TFirst, TSecond, TResult> resultSelector, IObserver<TResult> observer)                    : base(observer)                {                    _resultSelector = resultSelector;                }                private IEnumerator<TSecond> _rightEnumerator;                private static readonly IEnumerator<TSecond> DisposedEnumerator = MakeDisposedEnumerator();                private static IEnumerator<TSecond> MakeDisposedEnumerator()                {                    yield break;                }                public void Run(IObservable<TFirst> first, IEnumerable<TSecond> second)                {                    //                    // Notice the evaluation order of obtaining the enumerator and subscribing to the                    // observable sequence is reversed compared to the operator's signature. This is                    // required to make sure the enumerator is available as soon as the observer can                    // be called. Otherwise, we end up having a race for the initialization and use                    // of the _rightEnumerator field.                    //                    try                    {                        var enumerator = second.GetEnumerator();                        if (Interlocked.CompareExchange(ref _rightEnumerator, enumerator, null) != null)                        {                            enumerator.Dispose();                            return;                        }                    }                    catch (Exception exception)                    {                        ForwardOnError(exception);                        return;                    }                    base.Run(first);                }                protected override void Dispose(bool disposing)                {                    if (disposing)                    {                        Interlocked.Exchange(ref _rightEnumerator, DisposedEnumerator)?.Dispose();                    }                    base.Dispose(disposing);                }                public override void OnNext(TFirst value)                {                    var hasNext = false;                    try                    {                        hasNext = _rightEnumerator.MoveNext();                    }                    catch (Exception ex)                    {                        ForwardOnError(ex);                        return;                    }                    if (hasNext)                    {                        var right = default(TSecond);                        try                        {                            right = _rightEnumerator.Current;                        }                        catch (Exception ex)                        {                            ForwardOnError(ex);                            return;                        }                        TResult result;                        try                        {                            result = _resultSelector(value, right);                        }                        catch (Exception ex)                        {                            ForwardOnError(ex);                            return;                        }                        ForwardOnNext(result);                    }                    else                    {                        ForwardOnCompleted();                    }                }            }        }    }    #endregion    #region [3,16]-ary    #region Helpers for n-ary overloads    internal interface IZip    {        void Next(int index);        void Fail(Exception error);        void Done(int index);    }    internal abstract class ZipSink<TResult> : IdentitySink<TResult>, IZip    {        protected readonly object _gate;        private readonly ICollection[] _queues;        private readonly bool[] _isDone;        public ZipSink(int arity, IObserver<TResult> observer)            : base(observer)        {            _gate = new object();            _isDone = new bool[arity];            _queues = new ICollection[arity];        }        public ICollection[] Queues => _queues;        public void Next(int index)        {            var hasValueAll = true;            foreach (var queue in _queues)            {                if (queue.Count == 0)                {                    hasValueAll = false;                    break;                }            }            if (hasValueAll)            {                var res = default(TResult);                try                {                    res = GetResult();                }                catch (Exception ex)                {                    ForwardOnError(ex);                    return;                }                ForwardOnNext(res);            }            else            {                var allOthersDone = true;                for (var i = 0; i < _isDone.Length; i++)                {                    if (i != index && !_isDone[i])                    {                        allOthersDone = false;                        break;                    }                }                if (allOthersDone)                {                    ForwardOnCompleted();                }            }        }        protected abstract TResult GetResult();        public void Fail(Exception error)        {            ForwardOnError(error);        }        public void Done(int index)        {            _isDone[index] = true;            var allDone = true;            foreach (var isDone in _isDone)            {                if (!isDone)                {                    allDone = false;                    break;                }            }            if (allDone)            {                ForwardOnCompleted();                return;            }        }    }    internal sealed class ZipObserver<T> : SafeObserver<T>    {        private readonly object _gate;        private readonly IZip _parent;        private readonly int _index;        private readonly Queue<T> _values;        public ZipObserver(object gate, IZip parent, int index)        {            _gate = gate;            _parent = parent;            _index = index;            _values = new Queue<T>();        }        public Queue<T> Values => _values;        protected override void Dispose(bool disposing)        {            base.Dispose(disposing);            if (disposing)            {                lock (_gate)                {                    _values.Clear();                }            }        }        public override void OnNext(T value)        {            lock (_gate)            {                _values.Enqueue(value);                _parent.Next(_index);            }        }        public override void OnError(Exception error)        {            Dispose();            lock (_gate)            {                _parent.Fail(error);            }        }        public override void OnCompleted()        {            Dispose();            lock (_gate)            {                _parent.Done(_index);            }        }    }    #endregion    #endregion    #region N-ary    internal sealed class Zip<TSource> : Producer<IList<TSource>, Zip<TSource>._>    {        private readonly IEnumerable<IObservable<TSource>> _sources;        public Zip(IEnumerable<IObservable<TSource>> sources)        {            _sources = sources;        }        protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(this, observer);        protected override void Run(_ sink) => sink.Run();        internal sealed class _ : IdentitySink<IList<TSource>>        {            private readonly Zip<TSource> _parent;            private readonly object _gate;            public _(Zip<TSource> parent, IObserver<IList<TSource>> observer)                : base(observer)            {                _gate = new object();                _parent = parent;            }            private Queue<TSource>[] _queues;            private bool[] _isDone;            private IDisposable[] _subscriptions;            private static readonly IDisposable[] Disposed = new IDisposable[0];            public void Run()            {                var srcs = _parent._sources.ToArray();                var N = srcs.Length;                _queues = new Queue<TSource>[N];                for (var i = 0; i < N; i++)                    _queues[i] = new Queue<TSource>();                _isDone = new bool[N];                var subscriptions = new IDisposable[N];                if (Interlocked.CompareExchange(ref _subscriptions, subscriptions, null) == null)                {                    for (var i = 0; i < N; i++)                    {                        var o = new SourceObserver(this, i);                        Disposable.SetSingle(ref subscriptions[i], srcs[i].SubscribeSafe(o));                    }                }            }            protected override void Dispose(bool disposing)            {                if (disposing)                {                    var subscriptions = Interlocked.Exchange(ref _subscriptions, Disposed);                    if (subscriptions != null)                    {                        for (var i = 0; i < subscriptions.Length; i++)                        {                            Disposable.TryDispose(ref subscriptions[i]);                        }                        lock (_gate)                        {                            foreach (var q in _queues)                            {                                q.Clear();                            }                        }                    }                }                base.Dispose(disposing);            }            private void OnNext(int index, TSource value)            {                lock (_gate)                {                    _queues[index].Enqueue(value);                    if (_queues.All(q => q.Count > 0))                    {                        var n = _queues.Length;                        var res = new List<TSource>(n);                        for (var i = 0; i < n; i++)                        {                            res.Add(_queues[i].Dequeue());                        }                        ForwardOnNext(res);                    }                    else if (_isDone.AllExcept(index))                    {                        ForwardOnCompleted();                        return;                    }                }            }            private new void OnError(Exception error)            {                lock (_gate)                {                    ForwardOnError(error);                }            }            private void OnCompleted(int index)            {                lock (_gate)                {                    _isDone[index] = true;                    if (_isDone.All())                    {                        ForwardOnCompleted();                        return;                    }                    else                    {                        _subscriptions[index].Dispose();                    }                }            }            private sealed class SourceObserver : IObserver<TSource>            {                private readonly _ _parent;                private readonly int _index;                public SourceObserver(_ parent, int index)                {                    _parent = parent;                    _index = index;                }                public void OnNext(TSource value)                {                    _parent.OnNext(_index, value);                }                public void OnError(Exception error)                {                    _parent.OnError(error);                }                public void OnCompleted()                {                    _parent.OnCompleted(_index);                }            }        }    }    #endregion}
 |