// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #if !NO_PERF using System; using System.Collections; using System.Collections.Generic; using System.Linq; using System.Reactive.Disposables; namespace System.Reactive.Linq.ObservableImpl { #region Binary class Zip : Producer { private readonly IObservable _first; private readonly IObservable _second; private readonly IEnumerable _secondE; private readonly Func _resultSelector; public Zip(IObservable first, IObservable second, Func resultSelector) { _first = first; _second = second; _resultSelector = resultSelector; } public Zip(IObservable first, IEnumerable second, Func resultSelector) { _first = first; _secondE = second; _resultSelector = resultSelector; } protected override IDisposable Run(IObserver observer, IDisposable cancel, Action setSink) { if (_second != null) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } else { var sink = new ZipImpl(this, observer, cancel); setSink(sink); return sink.Run(); } } class _ : Sink { private readonly Zip _parent; public _(Zip parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; public IDisposable Run() { _gate = new object(); var fstSubscription = new SingleAssignmentDisposable(); var sndSubscription = new SingleAssignmentDisposable(); var fstO = new F(this, fstSubscription); var sndO = new S(this, sndSubscription); fstO.Other = sndO; sndO.Other = fstO; fstSubscription.Disposable = _parent._first.SubscribeSafe(fstO); sndSubscription.Disposable = _parent._second.SubscribeSafe(sndO); return StableCompositeDisposable.Create(fstSubscription, sndSubscription, fstO, sndO); } class F : IObserver, IDisposable { private readonly _ _parent; private readonly IDisposable _self; private S _other; private Queue _queue; public F(_ parent, IDisposable self) { _parent = parent; _self = self; _queue = new Queue(); } public S Other { set { _other = value; } } public Queue Queue { get { return _queue; } } public bool Done { get; private set; } public void OnNext(TFirst value) { lock (_parent._gate) { if (_other.Queue.Count > 0) { var r = _other.Queue.Dequeue(); var res = default(TResult); try { res = _parent._parent._resultSelector(value, r); } catch (Exception ex) { _parent._observer.OnError(ex); _parent.Dispose(); return; } _parent._observer.OnNext(res); } else { if (_other.Done) { _parent._observer.OnCompleted(); _parent.Dispose(); return; } _queue.Enqueue(value); } } } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { lock (_parent._gate) { Done = true; if (_other.Done) { _parent._observer.OnCompleted(); _parent.Dispose(); return; } else { _self.Dispose(); } } } public void Dispose() { _queue.Clear(); } } class S : IObserver, IDisposable { private readonly _ _parent; private readonly IDisposable _self; private F _other; private Queue _queue; public S(_ parent, IDisposable self) { _parent = parent; _self = self; _queue = new Queue(); } public F Other { set { _other = value; } } public Queue Queue { get { return _queue; } } public bool Done { get; private set; } public void OnNext(TSecond value) { lock (_parent._gate) { if (_other.Queue.Count > 0) { var l = _other.Queue.Dequeue(); var res = default(TResult); try { res = _parent._parent._resultSelector(l, value); } catch (Exception ex) { _parent._observer.OnError(ex); _parent.Dispose(); return; } _parent._observer.OnNext(res); } else { if (_other.Done) { _parent._observer.OnCompleted(); _parent.Dispose(); return; } _queue.Enqueue(value); } } } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { lock (_parent._gate) { Done = true; if (_other.Done) { _parent._observer.OnCompleted(); _parent.Dispose(); return; } else { _self.Dispose(); } } } public void Dispose() { _queue.Clear(); } } } class ZipImpl : Sink, IObserver { private readonly Zip _parent; public ZipImpl(Zip parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private IEnumerator _rightEnumerator; public IDisposable Run() { // // Notice the evaluation order of obtaining the enumerator and subscribing to the // observable sequence is reversed compared to the operator's signature. This is // required to make sure the enumerator is available as soon as the observer can // be called. Otherwise, we end up having a race for the initialization and use // of the _rightEnumerator field. // try { _rightEnumerator = _parent._secondE.GetEnumerator(); } catch (Exception exception) { base._observer.OnError(exception); base.Dispose(); return Disposable.Empty; } var leftSubscription = _parent._first.SubscribeSafe(this); return StableCompositeDisposable.Create(leftSubscription, _rightEnumerator); } public void OnNext(TFirst value) { var hasNext = false; try { hasNext = _rightEnumerator.MoveNext(); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } if (hasNext) { var right = default(TSecond); try { right = _rightEnumerator.Current; } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } TResult result; try { result = _parent._resultSelector(value, right); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } base._observer.OnNext(result); } else { base._observer.OnCompleted(); base.Dispose(); } } public void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public void OnCompleted() { base._observer.OnCompleted(); base.Dispose(); } } } #endregion #region [3,16]-ary #region Helpers for n-ary overloads interface IZip { void Next(int index); void Fail(Exception error); void Done(int index); } abstract class ZipSink : Sink, IZip { protected readonly object _gate; private readonly ICollection[] _queues; private readonly bool[] _isDone; public ZipSink(int arity, IObserver observer, IDisposable cancel) : base(observer, cancel) { _gate = new object(); _isDone = new bool[arity]; _queues = new ICollection[arity]; } public ICollection[] Queues { get { return _queues; } } public void Next(int index) { var hasValueAll = true; foreach (var queue in _queues) { if (queue.Count == 0) { hasValueAll = false; break; } } if (hasValueAll) { var res = default(TResult); try { res = GetResult(); } catch (Exception ex) { base._observer.OnError(ex); base.Dispose(); return; } base._observer.OnNext(res); } else { var allOthersDone = true; for (int i = 0; i < _isDone.Length; i++) { if (i != index && !_isDone[i]) { allOthersDone = false; break; } } if (allOthersDone) { base._observer.OnCompleted(); base.Dispose(); } } } protected abstract TResult GetResult(); public void Fail(Exception error) { base._observer.OnError(error); base.Dispose(); } public void Done(int index) { _isDone[index] = true; var allDone = true; foreach (var isDone in _isDone) { if (!isDone) { allDone = false; break; } } if (allDone) { base._observer.OnCompleted(); base.Dispose(); return; } } } class ZipObserver : IObserver { private readonly object _gate; private readonly IZip _parent; private readonly int _index; private readonly IDisposable _self; private readonly Queue _values; public ZipObserver(object gate, IZip parent, int index, IDisposable self) { _gate = gate; _parent = parent; _index = index; _self = self; _values = new Queue(); } public Queue Values { get { return _values; } } public void OnNext(T value) { lock (_gate) { _values.Enqueue(value); _parent.Next(_index); } } public void OnError(Exception error) { _self.Dispose(); lock (_gate) { _parent.Fail(error); } } public void OnCompleted() { _self.Dispose(); lock (_gate) { _parent.Done(_index); } } } #endregion #endregion #region N-ary class Zip : Producer> { private readonly IEnumerable> _sources; public Zip(IEnumerable> sources) { _sources = sources; } protected override IDisposable Run(IObserver> observer, IDisposable cancel, Action setSink) { var sink = new _(this, observer, cancel); setSink(sink); return sink.Run(); } class _ : Sink> { private readonly Zip _parent; public _(Zip parent, IObserver> observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private Queue[] _queues; private bool[] _isDone; private IDisposable[] _subscriptions; public IDisposable Run() { var srcs = _parent._sources.ToArray(); var N = srcs.Length; _queues = new Queue[N]; for (int i = 0; i < N; i++) _queues[i] = new Queue(); _isDone = new bool[N]; _subscriptions = new SingleAssignmentDisposable[N]; _gate = new object(); for (int i = 0; i < N; i++) { var j = i; var d = new SingleAssignmentDisposable(); _subscriptions[j] = d; var o = new O(this, j); d.Disposable = srcs[j].SubscribeSafe(o); } return new CompositeDisposable(_subscriptions) { Disposable.Create(() => { foreach (var q in _queues) q.Clear(); }) }; } private void OnNext(int index, TSource value) { lock (_gate) { _queues[index].Enqueue(value); if (_queues.All(q => q.Count > 0)) { var res = _queues.Select(q => q.Dequeue()).ToList(); base._observer.OnNext(res); } else if (_isDone.Where((x, i) => i != index).All(Stubs.I)) { base._observer.OnCompleted(); base.Dispose(); return; } } } private void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } private void OnCompleted(int index) { lock (_gate) { _isDone[index] = true; if (_isDone.All(Stubs.I)) { base._observer.OnCompleted(); base.Dispose(); return; } else { _subscriptions[index].Dispose(); } } } class O : IObserver { private readonly _ _parent; private readonly int _index; public O(_ parent, int index) { _parent = parent; _index = index; } public void OnNext(TSource value) { _parent.OnNext(_index, value); } public void OnError(Exception error) { _parent.OnError(error); } public void OnCompleted() { _parent.OnCompleted(_index); } } } } #endregion } #endif