// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System; using System.Collections.Generic; using System.Diagnostics; using System.Linq; using System.Reactive.Disposables; using System.Reactive.Subjects; namespace System.Reactive.Linq.ObservableImpl { class OrderBy : OrderedProducer, IOrderedObservable { private readonly Func> _timeSelector; private readonly Func _keySelector; private readonly IComparer _comparer; private readonly bool _descending; public OrderBy(IObservable source, Func> timeSelector, bool descending) : base(source, null) { _timeSelector = timeSelector; _descending = descending; } public OrderBy(IObservable source, Func> timeSelector, bool descending, OrderedProducer previous) : base(source, previous) { _timeSelector = timeSelector; _descending = descending; } public OrderBy(IObservable source, Func keySelector, IComparer comparer, bool descending) : base(source, null) { _keySelector = keySelector; _comparer = comparer ?? Comparer.Default; _descending = descending; } public OrderBy(IObservable source, Func keySelector, IComparer comparer, bool descending, OrderedProducer previous) : base(source, previous) { _keySelector = keySelector; _comparer = comparer ?? Comparer.Default; _descending = descending; } public IOrderedObservable CreateOrderedObservable(Func keySelector, IComparer comparer, bool descending) { return new OrderBy(base._source, keySelector, comparer, descending, previous: this); } public IOrderedObservable CreateOrderedObservable(Func> timeSelector, bool descending) { return new OrderBy(base._source, timeSelector, descending, previous: this); } protected override SortSink Sort(IObserver observer, IDisposable cancel) { if (_timeSelector != null) { if (_descending) { return new Descending(this, observer, cancel); } else { return new Ascending(this, observer, cancel); } } else { var sink = observer as ι; if (sink != null) { /* This optimization exists for 2 reasons: * * 1. To avoid having to use multiple buffers in consecutive ordering operations. * 2. To take advantage of Enumerable's optimizations for consecutive ordering operations. */ sink.OrderBy(this); return sink; } else { if (_descending) { return new Descending_(this, observer, cancel); } else { return new Ascending_(this, observer, cancel); } } } } class Ascending : ρ { public Ascending(OrderBy parent, IObserver observer, IDisposable cancel) : base(parent, observer, cancel) { } protected override void Consume(TSource value) { base._observer.OnNext(value); } protected override void Complete() { base._observer.OnCompleted(); } } class Descending : ρ { public Descending(OrderBy parent, IObserver observer, IDisposable cancel) : base(parent, observer, cancel) { } private IList _list; public override IDisposable Initialize() { _list = new List(); return base.Initialize(); } protected override void Consume(TSource value) { _list.Add(value); } protected override void Complete() { foreach (var value in _list.Reverse()) { base._observer.OnNext(value); } base._observer.OnCompleted(); } } class Ascending_ : ι { public Ascending_(OrderBy parent, IObserver observer, IDisposable cancel) : base(parent, observer, cancel) { } protected override IOrderedEnumerable OrderBy(IEnumerable source) { return source.OrderBy(_parent._keySelector, _parent._comparer); } protected override IOrderedEnumerable ThenBy(IOrderedEnumerable source) { return source.ThenBy(_parent._keySelector, _parent._comparer); } } class Descending_ : ι { public Descending_(OrderBy parent, IObserver observer, IDisposable cancel) : base(parent, observer, cancel) { } protected override IOrderedEnumerable OrderBy(IEnumerable source) { return source.OrderByDescending(base._parent._keySelector, base._parent._comparer); } protected override IOrderedEnumerable ThenBy(IOrderedEnumerable source) { return source.ThenByDescending(base._parent._keySelector, base._parent._comparer); } } /// /// Reactive sorting. This code is based on the code from the SelectMany operator (8/11/2013). /// abstract class ρ : SortSink { protected readonly OrderBy _parent; public ρ(OrderBy parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private object _gate; private bool _isStopped; private CompositeDisposable _group; private SingleAssignmentDisposable _sourceSubscription; public override IDisposable Initialize() { _gate = new object(); _isStopped = false; _group = new CompositeDisposable(); _sourceSubscription = new SingleAssignmentDisposable(); _group.Add(_sourceSubscription); return _group; } public override void Run(IObservable source) { _sourceSubscription.Disposable = source.SubscribeSafe(this); } public override void OnNext(TSource value) { var collection = default(IObservable); try { collection = _parent._timeSelector(value); } catch (Exception ex) { lock (_gate) { base._observer.OnError(ex); base.Dispose(); } return; } var innerSubscription = new SingleAssignmentDisposable(); _group.Add(innerSubscription); innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription)); } public override void OnError(Exception error) { lock (_gate) { base._observer.OnError(error); base.Dispose(); } } public override void OnCompleted() { _isStopped = true; if (_group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_gate) { Complete(); base.Dispose(); } } else { _sourceSubscription.Dispose(); } } protected abstract void Complete(); protected abstract void Consume(TSource value); class ι : IObserver { private readonly ρ _parent; private readonly TSource _value; private readonly IDisposable _self; public ι(ρ parent, TSource value, IDisposable self) { _parent = parent; _value = value; _self = self; } public void OnNext(TKey value) { OnCompleted(); } public void OnError(Exception error) { lock (_parent._gate) { _parent._observer.OnError(error); _parent.Dispose(); } } public void OnCompleted() { lock (_parent._gate) { _parent.Consume(_value); } _parent._group.Remove(_self); if (_parent._isStopped && _parent._group.Count == 1) { // // Notice there can be a race between OnCompleted of the source and any // of the inner sequences, where both see _group.Count == 1, and one is // waiting for the lock. There won't be a double OnCompleted observation // though, because the call to Dispose silences the observer by swapping // in a NopObserver. // lock (_parent._gate) { _parent.Complete(); _parent.Dispose(); } } } } } /// /// Aggregates before sorting. This code is based on the code from the ToList operator (8/11/2013). /// abstract class ι : SortSink { protected readonly OrderBy _parent; public ι(OrderBy parent, IObserver observer, IDisposable cancel) : base(observer, cancel) { _parent = parent; } private List _list; private Stack> _orderBy; private SingleAssignmentDisposable _sourceSubscription; public override IDisposable Initialize() { _list = new List(); _orderBy = new Stack>(); _sourceSubscription = new SingleAssignmentDisposable(); return _sourceSubscription; } public override void Run(IObservable source) { _sourceSubscription.Disposable = source.SubscribeSafe(this); } public override void OnNext(TSource value) { _list.Add(value); } public override void OnError(Exception error) { base._observer.OnError(error); base.Dispose(); } public override void OnCompleted() { foreach (var value in OrderAll(_list)) { base._observer.OnNext(value); } base._observer.OnCompleted(); base.Dispose(); } protected abstract IOrderedEnumerable OrderBy(IEnumerable source); protected abstract IOrderedEnumerable ThenBy(IOrderedEnumerable source); internal void OrderBy(OrderBy parent) { _orderBy.Push(parent); } private IEnumerable OrderAll(IEnumerable source) { IOrderedEnumerable ordered = null; foreach (var parent in _orderBy) { if (ordered == null) { ordered = parent._descending ? source.OrderByDescending(parent._keySelector, parent._comparer) : source.OrderBy(parent._keySelector, parent._comparer); } else { ordered = parent._descending ? ordered.ThenByDescending(parent._keySelector, parent._comparer) : ordered.ThenBy(parent._keySelector, parent._comparer); } } return ordered == null ? OrderBy(source) : ThenBy(ordered); } } } }