Преглед изворни кода

Cherry pick commit 545dfaf - pull request from davedev. Implemented OrderBy, OrderByDescending, ThenBy and ThenByDescending, with unit tests. This also required me to update to the HomoIcon tool and the Qbservable parity unit test.

davedev пре 12 година
родитељ
комит
ad115642f0
19 измењених фајлова са 2177 додато и 81 уклоњено
  1. 88 0
      Rx.NET/Source/System.Reactive.Core/Reactive/Internal/OrderedProducer.cs
  2. 1 0
      Rx.NET/Source/System.Reactive.Core/System.Reactive.Core.csproj
  3. 1 1
      Rx.NET/Source/System.Reactive.Experimental/Reactive/Linq/QbservableEx.Generated.cs
  4. 39 0
      Rx.NET/Source/System.Reactive.Interfaces/Reactive/Linq/IOrderedObservable.cs
  5. 21 0
      Rx.NET/Source/System.Reactive.Interfaces/Reactive/Linq/IOrderedQbservable.cs
  6. 2 0
      Rx.NET/Source/System.Reactive.Interfaces/System.Reactive.Interfaces.csproj
  7. 12 0
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs
  8. 406 0
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs
  9. 419 0
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OrderBy.cs
  10. 76 0
      Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
  11. 1 0
      Rx.NET/Source/System.Reactive.Linq/System.Reactive.Linq.csproj
  12. 607 73
      Rx.NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs
  13. 32 0
      Rx.NET/Source/Tests.System.Reactive/Dummies/DummyOrderedObservable.cs
  14. 1 0
      Rx.NET/Source/Tests.System.Reactive/Tests.System.Reactive.csproj
  15. 449 0
      Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs
  16. 3 1
      Rx.NET/Source/Tests.System.Reactive/Tests/Linq/QbservableTest.cs
  17. 19 6
      Rx.NET/tools/HomoIcon/Program.cs
  18. BIN
      Rx.NET/tools/HomoIcon/bin/Debug/HomoIcon.exe
  19. BIN
      Rx.NET/tools/HomoIcon/bin/Debug/HomoIcon.pdb

+ 88 - 0
Rx.NET/Source/System.Reactive.Core/Reactive/Internal/OrderedProducer.cs

@@ -0,0 +1,88 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Reactive.Subjects;
+
+namespace System.Reactive
+{
+    abstract class OrderedProducer<TSource> : Producer<TSource>
+    {
+        internal readonly IObservable<TSource> _source;
+        private readonly OrderedProducer<TSource> _previous;
+
+        protected OrderedProducer(IObservable<TSource> source, OrderedProducer<TSource> previous)
+        {
+            _source = source;
+            _previous = previous;
+        }
+
+        protected abstract SortSink Sort(IObserver<TSource> observer, IDisposable cancel);
+
+        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
+        {
+            var sink = Sort(observer, cancel);
+            setSink(sink);
+
+            var disposables = new CompositeDisposable();
+
+            var p = _previous;
+            while (p != null)
+            {
+                // p.Sort may return the same sink reference that was passed to it, so we need to ensure that initialization only occurs once
+                if (!sink._initialized)
+                    disposables.Add(sink.InitializeAndSet());
+
+                sink = p.Sort(sink, Disposable.Empty);
+                p = p._previous;
+            }
+
+            if (disposables.Count == 0)
+            {
+                Debug.Assert(!sink._initialized);
+
+                var d = sink.InitializeAndSet();
+                sink.Run(_source);
+                return d;
+            }
+            else
+            {
+                if (!sink._initialized)
+                    disposables.Add(sink.InitializeAndSet());
+
+                sink.Run(_source);
+                return new CompositeDisposable(disposables.Reverse());
+            }
+        }
+
+        protected abstract class SortSink : Sink<TSource>, IObserver<TSource>
+        {
+            internal bool _initialized;
+
+            protected SortSink(IObserver<TSource> observer, IDisposable cancel)
+                : base(observer, cancel)
+            {
+            }
+
+            internal IDisposable InitializeAndSet()
+            {
+                _initialized = true;
+                return Initialize();
+            }
+
+            public abstract IDisposable Initialize();
+
+            public abstract void Run(IObservable<TSource> source);
+
+            public abstract void OnNext(TSource value);
+
+            public abstract void OnError(Exception error);
+
+            public abstract void OnCompleted();
+        }
+    }
+}

+ 1 - 0
Rx.NET/Source/System.Reactive.Core/System.Reactive.Core.csproj

@@ -93,6 +93,7 @@
     <Compile Include="Reactive\Internal\ImmutableList.cs" />
     <Compile Include="Reactive\Internal\Lazy.cs" />
     <Compile Include="Reactive\Internal\Observers.cs" />
+    <Compile Include="Reactive\Internal\OrderedProducer.cs" />
     <Compile Include="Reactive\Internal\PlatformEnlightenmentProvider.cs" />
     <Compile Include="Reactive\Internal\PriorityQueue.cs" />
     <Compile Include="Reactive\Internal\Producer.cs" />

+ 1 - 1
Rx.NET/Source/System.Reactive.Experimental/Reactive/Linq/QbservableEx.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (11/4/2013 10:47:23 AM)
+ * WARNING: Auto-generated file (11/4/2013 11:19:06 AM)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 

+ 39 - 0
Rx.NET/Source/System.Reactive.Interfaces/Reactive/Linq/IOrderedObservable.cs

@@ -0,0 +1,39 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+
+namespace System.Reactive.Linq
+{
+    /// <summary>
+    /// Represents a sorted observable sequence.
+    /// </summary>
+    /// <typeparam name="T">
+    /// The type of the data in the data source.
+    /// This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
+    /// </typeparam>
+#if !NO_VARIANCE
+    public interface IOrderedObservable<out T> : IObservable<T>
+#else
+    public interface IOrderedObservable<T> : IObservable<T>
+#endif
+    {
+        /// <summary>
+        /// Performs a subsequent ordering on the elements of an <see cref="IOrderedObservable{T}"/> according to a key.
+        /// </summary>
+        /// <typeparam name="TKey">The type of the key produced by <paramref name="keySelector"/>.</typeparam>
+        /// <param name="keySelector">The function used to extract the key for each element.</param>
+        /// <param name="comparer">The <see cref="IComparer{TKey}"/> used to compare keys for placement in the returned sequence.</param>
+        /// <param name="descending"><see langword="True"/> to sort the elements in descending order; <see langword="false"/> to sort the elements in ascending order.</param>
+        /// <returns>An <see cref="IOrderedObservable{T}"/> whose elements are sorted according to a key.</returns>
+        IOrderedObservable<T> CreateOrderedObservable<TKey>(Func<T, TKey> keySelector, IComparer<TKey> comparer, bool descending);
+
+        /// <summary>
+        /// Performs a subsequent ordering on the elements of an <see cref="IOrderedObservable{T}"/> according to other observable sequences.
+        /// </summary>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
+        /// <param name="timeSelector">A function that returns an observable for an element indicating the time at which that element should appear in the ordering.</param>
+        /// <param name="descending"><see langword="True"/> to sort the elements in descending order; <see langword="false"/> to sort the elements in ascending order.</param>
+        /// <returns>An <see cref="IOrderedObservable{T}"/> whose elements are sorted according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        IOrderedObservable<T> CreateOrderedObservable<TOther>(Func<T, IObservable<TOther>> timeSelector, bool descending);
+    }
+}

+ 21 - 0
Rx.NET/Source/System.Reactive.Interfaces/Reactive/Linq/IOrderedQbservable.cs

@@ -0,0 +1,21 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System.Collections.Generic;
+
+namespace System.Reactive.Linq
+{
+    /// <summary>
+    /// Represents a sorted observable sequence.
+    /// </summary>
+    /// <typeparam name="T">
+    /// The type of the data in the data source.
+    /// This type parameter is covariant. That is, you can use either the type you specified or any type that is more derived. For more information about covariance and contravariance, see Covariance and Contravariance in Generics.
+    /// </typeparam>
+#if !NO_VARIANCE
+    public interface IOrderedQbservable<out T> : IQbservable<T>
+#else
+    public interface IOrderedQbservable<T> : IQbservable<T>
+#endif
+    {
+    }
+}

+ 2 - 0
Rx.NET/Source/System.Reactive.Interfaces/System.Reactive.Interfaces.csproj

@@ -49,6 +49,8 @@
     <Compile Include="Reactive\IEventPatternSource.cs" />
     <Compile Include="Reactive\IEventSource.cs" />
     <Compile Include="Reactive\IObserver.Result.cs" />
+    <Compile Include="Reactive\Linq\IOrderedQbservable.cs" />
+    <Compile Include="Reactive\Linq\IOrderedObservable.cs" />
     <Compile Include="Reactive\Linq\IGroupedObservable.cs" />
     <Compile Include="Reactive\Linq\IQbservable.cs" />
     <Compile Include="Reactive\Linq\IQbservableProvider.cs" />

+ 12 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs

@@ -697,6 +697,12 @@ namespace System.Reactive.Linq
         IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector);
         IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector);
         IObservable<TResult> OfType<TResult>(IObservable<object> source);
+        IOrderedObservable<TSource> OrderBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector);
+        IOrderedObservable<TSource> OrderBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer);
+        IOrderedObservable<TSource> OrderBy<TSource, TOther>(IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector);
+        IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector);
+        IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer);
+        IOrderedObservable<TSource> OrderByDescending<TSource, TOther>(IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector);
         IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector);
         IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, TResult> selector);
         IObservable<TOther> SelectMany<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
@@ -717,6 +723,12 @@ namespace System.Reactive.Linq
         IObservable<TSource> Take<TSource>(IObservable<TSource> source, int count, IScheduler scheduler);
         IObservable<TSource> TakeWhile<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate);
         IObservable<TSource> TakeWhile<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate);
+        IOrderedObservable<TSource> ThenBy<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector);
+        IOrderedObservable<TSource> ThenBy<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer);
+        IOrderedObservable<TSource> ThenBy<TSource, TOther>(IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector);
+        IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector);
+        IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer);
+        IOrderedObservable<TSource> ThenByDescending<TSource, TOther>(IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector);
         IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate);
         IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, int, bool> predicate);
 

+ 406 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs

@@ -710,6 +710,252 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + OrderBy +
+
+        /// <summary>
+        /// Sorts the elements of a sequence in ascending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="ToList"/> because time information is lost and notifications are only generated when the <paramref name="source"/>
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="OutOfMemoryException"/> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedObservable<TSource> OrderBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+
+            return s_impl.OrderBy<TSource, TKey>(source, keySelector);
+        }
+
+        /// <summary>
+        /// Sorts the elements of a sequence in ascending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="ToList"/> because timing is lost and notifications are only generated when the <paramref name="source"/>
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="OutOfMemoryException"/> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedObservable<TSource> OrderBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.OrderBy<TSource, TKey>(source, keySelector, comparer);
+        }
+
+        /// <summary>
+        /// Sorts the elements of a sequence in ascending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// This overload of OrderBy is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <para>
+        /// The primary benefits of this overload of OrderBy is that it relates to ordering specifically, thus it's more semantic than SelectMany and it allows an author to avoid 
+        /// defining an unused query variable when applying SelectMany merely as an ordering operator.  It also returns <see cref="IOrderedObservable{TSource}"/> so that it may 
+        /// be used in combination with any overloads of ThenBy and ThenByDescending to define queries that order by time and key.
+        /// </para>
+        /// <alert type="info">
+        /// Unlike the other overload of OrderBy, this overload does not buffer any elements and it does not wait for the <paramref name="source"/> sequence to complete before it 
+        /// pushes notifications.  This overload is entirely reactive.
+        /// </alert>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as the key.
+        /// </para>
+        /// <example>
+        /// <para>
+        /// The following example shows how to use this overload of OrderBy with the orderby LINQ query comprehension syntax in C#.
+        /// </para>
+        /// <para>
+        /// The result of this query is a sequence of integers from 5 to 1, at 1 second intervals.
+        /// </para>
+        /// <code>
+        /// <![CDATA[IObservable<int> xs = 
+        ///   from x in Observable.Range(1, 5)
+        ///   orderby Observable.Timer(TimeSpan.FromSeconds(5 - x))
+        ///   select x;]]>
+        /// </code>
+        /// </example>
+        /// </remarks>
+        public static IOrderedObservable<TSource> OrderBy<TSource, TOther>(this IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+
+            return s_impl.OrderBy<TSource, TOther>(source, timeSelector);
+        }
+
+        #endregion
+
+        #region + OrderByDescending +
+
+        /// <summary>
+        /// Sorts the elements of a sequence in descending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="ToList"/> because timing is lost and notifications are only generated when the <paramref name="source"/>
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="OutOfMemoryException"/> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+
+            return s_impl.OrderByDescending<TSource, TKey>(source, keySelector);
+        }
+
+        /// <summary>
+        /// Sorts the elements of a sequence in descending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="ToList"/> because timing is lost and notifications are only generated when the <paramref name="source"/>
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="OutOfMemoryException"/> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.OrderByDescending<TSource, TKey>(source, keySelector, comparer);
+        }
+
+        /// <summary>
+        /// Sorts the elements of a sequence in descending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering descending by time requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence descending by time is similar to performing an aggregation 
+        /// such as <see cref="ToList"/> because timing from the <paramref name="source"/> sequence is lost and notifications are only generated when 
+        /// the <paramref name="source"/> sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="OutOfMemoryException"/> being thrown.
+        /// </alert>
+        /// <para>
+        /// This overload of OrderByDescending is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <para>
+        /// The primary benefits of this overload of OrderByDescending is that it relates to ordering specifically, thus it's more semantic than SelectMany and it allows an author to avoid 
+        /// defining an unused query variable when applying SelectMany merely as an ordering operator.  It also returns <see cref="IOrderedObservable{TSource}"/> so that it may 
+        /// be used in combination with any overloads of ThenBy and ThenByDescending to define queries that order by time and key.
+        /// </para>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as the key.
+        /// </para>
+        /// <example>
+        /// <para>
+        /// The following example shows how to use this overload of OrderByDescending with the orderby LINQ query comprehension syntax in C#.
+        /// </para>
+        /// <para>
+        /// The result of this query is a sequence of integers from 1 to 5, all of them arriving at 5 seconds from the time of subscription.
+        /// </para>
+        /// <code>
+        /// <![CDATA[IObservable<int> xs = 
+        ///   from x in Observable.Range(1, 5)
+        ///   orderby Observable.Timer(TimeSpan.FromSeconds(5 - x)) descending
+        ///   select x;]]>
+        /// </code>
+        /// </example>
+        /// </remarks>
+        public static IOrderedObservable<TSource> OrderByDescending<TSource, TOther>(this IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+
+            return s_impl.OrderByDescending<TSource, TOther>(source, timeSelector);
+        }
+
+        #endregion
+
         #region + Select +
 
         /// <summary>
@@ -1327,6 +1573,166 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + ThenBy +
+
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in ascending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
+        public static IOrderedObservable<TSource> ThenBy<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+
+            return s_impl.ThenBy<TSource, TKey>(source, keySelector);
+        }
+
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in ascending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
+        public static IOrderedObservable<TSource> ThenBy<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.ThenBy<TSource, TKey>(source, keySelector, comparer);
+        }
+
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in ascending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// This overload of ThenBy is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <alert type="info">
+        /// Unlike the other overload of ThenBy, this overload does not buffer any elements and it does not wait for the <paramref name="source"/> sequence to complete before it 
+        /// pushes notifications.  This overload is entirely reactive.
+        /// </alert>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as a subsequent key.
+        /// </para>
+        /// </remarks>
+        public static IOrderedObservable<TSource> ThenBy<TSource, TOther>(this IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+
+            return s_impl.ThenBy<TSource, TOther>(source, timeSelector);
+        }
+
+        #endregion
+
+        #region + ThenByDescending +
+
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in descending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is <see langword="null"/>.</exception>
+        public static IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+
+            return s_impl.ThenByDescending<TSource, TKey>(source, keySelector);
+        }
+
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in descending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="IComparer{TKey}"/> to compare keys.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/>, <paramref name="keySelector"/> or <paramref name="comparer"/> is <see langword="null"/>.</exception>
+        public static IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(this IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.ThenByDescending<TSource, TKey>(source, keySelector, comparer);
+        }
+
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in descending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source"/> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector"/>.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source"/> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="IOrderedObservable{TSource}"/> whose elements are sorted in descending order according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="timeSelector"/> is <see langword="null"/>.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering descending by time requires all of the elements in the <paramref name="source"/> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence descending by time is similar to performing an aggregation 
+        /// such as <see cref="ToList"/> because timing from the <paramref name="source"/> sequence is lost and notifications are only generated when 
+        /// the <paramref name="source"/> sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="OutOfMemoryException"/> being thrown.
+        /// </alert>
+        /// <para>
+        /// This overload of ThenByDescending is similar to a particular usage of SelectMany where the elements in the <paramref name="source"/> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as a subsequent key.
+        /// </para>
+        /// </remarks>
+        public static IOrderedObservable<TSource> ThenByDescending<TSource, TOther>(this IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+
+            return s_impl.ThenByDescending<TSource, TOther>(source, timeSelector);
+        }
+
+        #endregion
+
         #region + Where +
 
         /// <summary>

+ 419 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OrderBy.cs

@@ -0,0 +1,419 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Diagnostics;
+using System.Linq;
+using System.Reactive.Disposables;
+using System.Reactive.Subjects;
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    class OrderBy<TSource, TKey> : OrderedProducer<TSource>, IOrderedObservable<TSource>
+    {
+        private readonly Func<TSource, IObservable<TKey>> _timeSelector;
+        private readonly Func<TSource, TKey> _keySelector;
+        private readonly IComparer<TKey> _comparer;
+        private readonly bool _descending;
+
+        public OrderBy(IObservable<TSource> source, Func<TSource, IObservable<TKey>> timeSelector, bool descending)
+            : base(source, null)
+        {
+            _timeSelector = timeSelector;
+            _descending = descending;
+        }
+
+        public OrderBy(IObservable<TSource> source, Func<TSource, IObservable<TKey>> timeSelector, bool descending, OrderedProducer<TSource> previous)
+            : base(source, previous)
+        {
+            _timeSelector = timeSelector;
+            _descending = descending;
+        }
+
+        public OrderBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, bool descending)
+            : base(source, null)
+        {
+            _keySelector = keySelector;
+            _comparer = comparer ?? Comparer<TKey>.Default;
+            _descending = descending;
+        }
+
+        public OrderBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, bool descending, OrderedProducer<TSource> previous)
+            : base(source, previous)
+        {
+            _keySelector = keySelector;
+            _comparer = comparer ?? Comparer<TKey>.Default;
+            _descending = descending;
+        }
+
+        public IOrderedObservable<TSource> CreateOrderedObservable<TTKey>(Func<TSource, TTKey> keySelector, IComparer<TTKey> comparer, bool descending)
+        {
+            return new OrderBy<TSource, TTKey>(base._source, keySelector, comparer, descending, previous: this);
+        }
+
+        public IOrderedObservable<TSource> CreateOrderedObservable<TOther>(Func<TSource, IObservable<TOther>> timeSelector, bool descending)
+        {
+            return new OrderBy<TSource, TOther>(base._source, timeSelector, descending, previous: this);
+        }
+
+        protected override SortSink Sort(IObserver<TSource> observer, IDisposable cancel)
+        {
+            if (_timeSelector != null)
+            {
+                if (_descending)
+                {
+                    return new Descending(this, observer, cancel);
+                }
+                else
+                {
+                    return new Ascending(this, observer, cancel);
+                }
+            }
+            else
+            {
+                var sink = observer as ι;
+
+                if (sink != null)
+                {
+                    /* This optimization exists for 2 reasons: 
+                     * 
+                     * 1. To avoid having to use multiple buffers in consecutive ordering operations.
+                     * 2. To take advantage of Enumerable's optimizations for consecutive ordering operations.
+                     */
+                    sink.OrderBy(this);
+                    return sink;
+                }
+                else
+                {
+                    if (_descending)
+                    {
+                        return new Descending_(this, observer, cancel);
+                    }
+                    else
+                    {
+                        return new Ascending_(this, observer, cancel);
+                    }
+                }
+            }
+        }
+
+        class Ascending : ρ
+        {
+            public Ascending(OrderBy<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
+                : base(parent, observer, cancel)
+            {
+            }
+
+            protected override void Consume(TSource value)
+            {
+                base._observer.OnNext(value);
+            }
+
+            protected override void Complete()
+            {
+                base._observer.OnCompleted();
+            }
+        }
+
+        class Descending : ρ
+        {
+            public Descending(OrderBy<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
+                : base(parent, observer, cancel)
+            {
+            }
+
+            private IList<TSource> _list;
+
+            public override IDisposable Initialize()
+            {
+                _list = new List<TSource>();
+
+                return base.Initialize();
+            }
+
+            protected override void Consume(TSource value)
+            {
+                _list.Add(value);
+            }
+
+            protected override void Complete()
+            {
+                foreach (var value in _list.Reverse())
+                {
+                    base._observer.OnNext(value);
+                }
+                base._observer.OnCompleted();
+            }
+        }
+
+        class Ascending_ : ι
+        {
+            public Ascending_(OrderBy<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
+                : base(parent, observer, cancel)
+            {
+            }
+
+            protected override IOrderedEnumerable<TSource> OrderBy(IEnumerable<TSource> source)
+            {
+                return source.OrderBy(_parent._keySelector, _parent._comparer);
+            }
+
+            protected override IOrderedEnumerable<TSource> ThenBy(IOrderedEnumerable<TSource> source)
+            {
+                return source.ThenBy(_parent._keySelector, _parent._comparer);
+            }
+        }
+
+        class Descending_ : ι
+        {
+            public Descending_(OrderBy<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
+                : base(parent, observer, cancel)
+            {
+            }
+
+            protected override IOrderedEnumerable<TSource> OrderBy(IEnumerable<TSource> source)
+            {
+                return source.OrderByDescending(base._parent._keySelector, base._parent._comparer);
+            }
+
+            protected override IOrderedEnumerable<TSource> ThenBy(IOrderedEnumerable<TSource> source)
+            {
+                return source.ThenByDescending(base._parent._keySelector, base._parent._comparer);
+            }
+        }
+
+        /// <summary>
+        /// Reactive sorting.  This code is based on the code from the SelectMany operator (8/11/2013).
+        /// </summary>
+        abstract class ρ : SortSink
+        {
+            protected readonly OrderBy<TSource, TKey> _parent;
+
+            public ρ(OrderBy<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
+                : base(observer, cancel)
+            {
+                _parent = parent;
+            }
+
+            private object _gate;
+            private bool _isStopped;
+            private CompositeDisposable _group;
+            private SingleAssignmentDisposable _sourceSubscription;
+
+            public override IDisposable Initialize()
+            {
+                _gate = new object();
+                _isStopped = false;
+                _group = new CompositeDisposable();
+
+                _sourceSubscription = new SingleAssignmentDisposable();
+                _group.Add(_sourceSubscription);
+
+                return _group;
+            }
+
+            public override void Run(IObservable<TSource> source)
+            {
+                _sourceSubscription.Disposable = source.SubscribeSafe(this);
+            }
+
+            public override void OnNext(TSource value)
+            {
+                var collection = default(IObservable<TKey>);
+
+                try
+                {
+                    collection = _parent._timeSelector(value);
+                }
+                catch (Exception ex)
+                {
+                    lock (_gate)
+                    {
+                        base._observer.OnError(ex);
+                        base.Dispose();
+                    }
+                    return;
+                }
+
+                var innerSubscription = new SingleAssignmentDisposable();
+                _group.Add(innerSubscription);
+                innerSubscription.Disposable = collection.SubscribeSafe(new ι(this, value, innerSubscription));
+            }
+
+            public override void OnError(Exception error)
+            {
+                lock (_gate)
+                {
+                    base._observer.OnError(error);
+                    base.Dispose();
+                }
+            }
+
+            public override void OnCompleted()
+            {
+                _isStopped = true;
+                if (_group.Count == 1)
+                {
+                    //
+                    // Notice there can be a race between OnCompleted of the source and any
+                    // of the inner sequences, where both see _group.Count == 1, and one is
+                    // waiting for the lock. There won't be a double OnCompleted observation
+                    // though, because the call to Dispose silences the observer by swapping
+                    // in a NopObserver<T>.
+                    //
+                    lock (_gate)
+                    {
+                        Complete();
+                        base.Dispose();
+                    }
+                }
+                else
+                {
+                    _sourceSubscription.Dispose();
+                }
+            }
+
+            protected abstract void Complete();
+
+            protected abstract void Consume(TSource value);
+
+            class ι : IObserver<TKey>
+            {
+                private readonly ρ _parent;
+                private readonly TSource _value;
+                private readonly IDisposable _self;
+
+                public ι(ρ parent, TSource value, IDisposable self)
+                {
+                    _parent = parent;
+                    _value = value;
+                    _self = self;
+                }
+
+                public void OnNext(TKey value)
+                {
+                    OnCompleted();
+                }
+
+                public void OnError(Exception error)
+                {
+                    lock (_parent._gate)
+                    {
+                        _parent._observer.OnError(error);
+                        _parent.Dispose();
+                    }
+                }
+
+                public void OnCompleted()
+                {
+                    lock (_parent._gate)
+                    {
+                        _parent.Consume(_value);
+                    }
+
+                    _parent._group.Remove(_self);
+                    if (_parent._isStopped && _parent._group.Count == 1)
+                    {
+                        //
+                        // Notice there can be a race between OnCompleted of the source and any
+                        // of the inner sequences, where both see _group.Count == 1, and one is
+                        // waiting for the lock. There won't be a double OnCompleted observation
+                        // though, because the call to Dispose silences the observer by swapping
+                        // in a NopObserver<T>.
+                        //
+                        lock (_parent._gate)
+                        {
+                            _parent.Complete();
+                            _parent.Dispose();
+                        }
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Aggregates before sorting.  This code is based on the code from the ToList operator (8/11/2013).
+        /// </summary>
+        abstract class ι : SortSink
+        {
+            protected readonly OrderBy<TSource, TKey> _parent;
+
+            public ι(OrderBy<TSource, TKey> parent, IObserver<TSource> observer, IDisposable cancel)
+                : base(observer, cancel)
+            {
+                _parent = parent;
+            }
+
+            private List<TSource> _list;
+            private Stack<OrderBy<TSource, TKey>> _orderBy;
+            private SingleAssignmentDisposable _sourceSubscription;
+
+            public override IDisposable Initialize()
+            {
+                _list = new List<TSource>();
+                _orderBy = new Stack<OrderBy<TSource, TKey>>();
+                _sourceSubscription = new SingleAssignmentDisposable();
+
+                return _sourceSubscription;
+            }
+
+            public override void Run(IObservable<TSource> source)
+            {
+                _sourceSubscription.Disposable = source.SubscribeSafe(this);
+            }
+
+            public override void OnNext(TSource value)
+            {
+                _list.Add(value);
+            }
+
+            public override void OnError(Exception error)
+            {
+                base._observer.OnError(error);
+                base.Dispose();
+            }
+
+            public override void OnCompleted()
+            {
+                foreach (var value in OrderAll(_list))
+                {
+                    base._observer.OnNext(value);
+                }
+                base._observer.OnCompleted();
+                base.Dispose();
+            }
+
+            protected abstract IOrderedEnumerable<TSource> OrderBy(IEnumerable<TSource> source);
+
+            protected abstract IOrderedEnumerable<TSource> ThenBy(IOrderedEnumerable<TSource> source);
+
+            internal void OrderBy(OrderBy<TSource, TKey> parent)
+            {
+                _orderBy.Push(parent);
+            }
+
+            private IEnumerable<TSource> OrderAll(IEnumerable<TSource> source)
+            {
+                IOrderedEnumerable<TSource> ordered = null;
+
+                foreach (var parent in _orderBy)
+                {
+                    if (ordered == null)
+                    {
+                        ordered = parent._descending
+                                ? source.OrderByDescending(parent._keySelector, parent._comparer)
+                                : source.OrderBy(parent._keySelector, parent._comparer);
+                    }
+                    else
+                    {
+                        ordered = parent._descending
+                                ? ordered.ThenByDescending(parent._keySelector, parent._comparer)
+                                : ordered.ThenBy(parent._keySelector, parent._comparer);
+                    }
+                }
+
+                return ordered == null ? OrderBy(source) : ThenBy(ordered);
+            }
+        }
+    }
+}

+ 76 - 0
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs

@@ -765,6 +765,44 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + OrderBy +
+
+        public virtual IOrderedObservable<TSource> OrderBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            return new OrderBy<TSource, TKey>(source, keySelector, comparer: null, descending: false);
+        }
+
+        public virtual IOrderedObservable<TSource> OrderBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            return new OrderBy<TSource, TKey>(source, keySelector, comparer, descending: false);
+        }
+
+        public virtual IOrderedObservable<TSource> OrderBy<TSource, TOther>(IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            return new OrderBy<TSource, TOther>(source, timeSelector, descending: false);
+        }
+
+        #endregion
+
+        #region + OrderByDescending +
+
+        public virtual IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            return new OrderBy<TSource, TKey>(source, keySelector, comparer: null, descending: true);
+        }
+
+        public virtual IOrderedObservable<TSource> OrderByDescending<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            return new OrderBy<TSource, TKey>(source, keySelector, comparer, descending: true);
+        }
+
+        public virtual IOrderedObservable<TSource> OrderByDescending<TSource, TOther>(IObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            return new OrderBy<TSource, TOther>(source, timeSelector, descending: true);
+        }
+
+        #endregion
+
         #region + Select +
 
         public virtual IObservable<TResult> Select<TSource, TResult>(IObservable<TSource> source, Func<TSource, TResult> selector)
@@ -1383,6 +1421,44 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + ThenBy +
+
+        public virtual IOrderedObservable<TSource> ThenBy<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            return source.CreateOrderedObservable(keySelector, comparer: null, descending: false);
+        }
+
+        public virtual IOrderedObservable<TSource> ThenBy<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            return source.CreateOrderedObservable(keySelector, comparer, descending: false);
+        }
+
+        public virtual IOrderedObservable<TSource> ThenBy<TSource, TOther>(IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            return source.CreateOrderedObservable(timeSelector, descending: false);
+        }
+
+        #endregion
+
+        #region + ThenByDescending +
+
+        public virtual IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector)
+        {
+            return source.CreateOrderedObservable(keySelector, comparer: null, descending: true);
+        }
+
+        public virtual IOrderedObservable<TSource> ThenByDescending<TSource, TKey>(IOrderedObservable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer)
+        {
+            return source.CreateOrderedObservable(keySelector, comparer, descending: true);
+        }
+
+        public virtual IOrderedObservable<TSource> ThenByDescending<TSource, TOther>(IOrderedObservable<TSource> source, Func<TSource, IObservable<TOther>> timeSelector)
+        {
+            return source.CreateOrderedObservable(timeSelector, descending: true);
+        }
+
+        #endregion
+
         #region + Where +
 
         public virtual IObservable<TSource> Where<TSource>(IObservable<TSource> source, Func<TSource, bool> predicate)

+ 1 - 0
Rx.NET/Source/System.Reactive.Linq/System.Reactive.Linq.csproj

@@ -47,6 +47,7 @@
     <Compile Include="Reactive\Linq\LocalQueryMethodImplementationTypeAttribute.cs" />
     <Compile Include="Reactive\Linq\Observable\Case.cs" />
     <Compile Include="Reactive\Linq\Observable\Collect.cs" />
+    <Compile Include="Reactive\Linq\Observable\OrderBy.cs" />
     <Compile Include="Reactive\Linq\Observable\If.cs" />
     <Compile Include="Reactive\Linq\Observable\For.cs" />
     <Compile Include="Reactive\Linq\Observable\DoWhile.cs" />

+ 607 - 73
Rx.NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (11/4/2013 10:47:23 AM)
+ * WARNING: Auto-generated file (11/4/2013 11:19:06 AM)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 
@@ -10752,6 +10752,314 @@ namespace System.Reactive.Linq
             );
         }
         
+        /// <summary>
+        /// Sorts the elements of a sequence in ascending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source" /> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="M:System.Reactive.Linq.Observable.ToList``1(System.IObservable{``0})" /> because time information is lost and notifications are only generated when the <paramref name="source" />
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="T:System.OutOfMemoryException" /> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> OrderBy<TSource, TKey>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.OrderBy<TSource, TKey>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Sorts the elements of a sequence in ascending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="T:System.Collections.Generic.IComparer`1" /> to compare keys.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" />, <paramref name="keySelector" /> or <paramref name="comparer" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source" /> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="M:System.Reactive.Linq.Observable.ToList``1(System.IObservable{``0})" /> because timing is lost and notifications are only generated when the <paramref name="source" />
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="T:System.OutOfMemoryException" /> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> OrderBy<TSource, TKey>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.OrderBy<TSource, TKey>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(IComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    Expression.Constant(comparer, typeof(IComparer<TKey>))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Sorts the elements of a sequence in ascending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector" />.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source" /> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="timeSelector" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// This overload of OrderBy is similar to a particular usage of SelectMany where the elements in the <paramref name="source" /> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <para>
+        /// The primary benefits of this overload of OrderBy is that it relates to ordering specifically, thus it's more semantic than SelectMany and it allows an author to avoid 
+        /// defining an unused query variable when applying SelectMany merely as an ordering operator.  It also returns <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> so that it may 
+        /// be used in combination with any overloads of ThenBy and ThenByDescending to define queries that order by time and key.
+        /// </para>
+        /// <alert type="info">
+        /// Unlike the other overload of OrderBy, this overload does not buffer any elements and it does not wait for the <paramref name="source" /> sequence to complete before it 
+        /// pushes notifications.  This overload is entirely reactive.
+        /// </alert>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as the key.
+        /// </para>
+        /// <example>
+        /// <para>
+        /// The following example shows how to use this overload of OrderBy with the orderby LINQ query comprehension syntax in C#.
+        /// </para>
+        /// <para>
+        /// The result of this query is a sequence of integers from 5 to 1, at 1 second intervals.
+        /// </para>
+        /// <code><![CDATA[IObservable<int> xs = 
+        /// from x in Observable.Range(1, 5)
+        /// orderby Observable.Timer(TimeSpan.FromSeconds(5 - x))
+        /// select x;]]></code>
+        /// </example>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> OrderBy<TSource, TOther>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TOther>>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.OrderBy<TSource, TOther>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TOther>>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TOther)),
+#endif
+                    source.Expression,
+                    timeSelector
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Sorts the elements of a sequence in descending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source" /> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="M:System.Reactive.Linq.Observable.ToList``1(System.IObservable{``0})" /> because timing is lost and notifications are only generated when the <paramref name="source" />
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="T:System.OutOfMemoryException" /> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> OrderByDescending<TSource, TKey>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.OrderByDescending<TSource, TKey>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Sorts the elements of a sequence in descending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="T:System.Collections.Generic.IComparer`1" /> to compare keys.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" />, <paramref name="keySelector" /> or <paramref name="comparer" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering requires all of the elements in the <paramref name="source" /> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence is similar to performing an aggregation 
+        /// such as <see cref="M:System.Reactive.Linq.Observable.ToList``1(System.IObservable{``0})" /> because timing is lost and notifications are only generated when the <paramref name="source" />
+        /// sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="T:System.OutOfMemoryException" /> being thrown.
+        /// </alert>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> OrderByDescending<TSource, TKey>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.OrderByDescending<TSource, TKey>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(IComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    Expression.Constant(comparer, typeof(IComparer<TKey>))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Sorts the elements of a sequence in descending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector" />.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source" /> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted in descending order according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="timeSelector" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering descending by time requires all of the elements in the <paramref name="source" /> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence descending by time is similar to performing an aggregation 
+        /// such as <see cref="M:System.Reactive.Linq.Observable.ToList``1(System.IObservable{``0})" /> because timing from the <paramref name="source" /> sequence is lost and notifications are only generated when 
+        /// the <paramref name="source" /> sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="T:System.OutOfMemoryException" /> being thrown.
+        /// </alert>
+        /// <para>
+        /// This overload of OrderByDescending is similar to a particular usage of SelectMany where the elements in the <paramref name="source" /> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <para>
+        /// The primary benefits of this overload of OrderByDescending is that it relates to ordering specifically, thus it's more semantic than SelectMany and it allows an author to avoid 
+        /// defining an unused query variable when applying SelectMany merely as an ordering operator.  It also returns <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> so that it may 
+        /// be used in combination with any overloads of ThenBy and ThenByDescending to define queries that order by time and key.
+        /// </para>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as the key.
+        /// </para>
+        /// <example>
+        /// <para>
+        /// The following example shows how to use this overload of OrderByDescending with the orderby LINQ query comprehension syntax in C#.
+        /// </para>
+        /// <para>
+        /// The result of this query is a sequence of integers from 1 to 5, all of them arriving at 5 seconds from the time of subscription.
+        /// </para>
+        /// <code><![CDATA[IObservable<int> xs = 
+        /// from x in Observable.Range(1, 5)
+        /// orderby Observable.Timer(TimeSpan.FromSeconds(5 - x)) descending
+        /// select x;]]></code>
+        /// </example>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> OrderByDescending<TSource, TOther>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TOther>>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.OrderByDescending<TSource, TOther>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TOther>>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TOther)),
+#endif
+                    source.Expression,
+                    timeSelector
+                )
+            );
+        }
+        
         /// <summary>
         /// Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence.
         /// This operator is a specialization of Multicast using a regular <see cref="T:System.Reactive.Subjects.Subject`1" />.
@@ -11787,10 +12095,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element.</param>
@@ -11798,8 +12106,7 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11812,7 +12119,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -11824,10 +12131,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
@@ -11835,8 +12142,7 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11849,7 +12155,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -11861,10 +12167,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element.</param>
@@ -11872,7 +12178,8 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11885,7 +12192,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -11897,10 +12204,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
@@ -11908,7 +12215,8 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11921,7 +12229,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -12041,18 +12349,19 @@ namespace System.Reactive.Linq
             );
         }
         
+#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to a task with cancellation support and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
-        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12063,7 +12372,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12072,19 +12381,21 @@ namespace System.Reactive.Linq
                 )
             );
         }
+#endif
         
+#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index and concatenates the resulting enumerable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
-        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12095,7 +12406,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12104,18 +12415,20 @@ namespace System.Reactive.Linq
                 )
             );
         }
+#endif
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TResult>>> selector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12126,7 +12439,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12137,16 +12450,17 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index and concatenates the resulting enumerable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TResult>>> selector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12157,7 +12471,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12167,19 +12481,17 @@ namespace System.Reactive.Linq
             );
         }
         
-#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
-        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
+        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12190,7 +12502,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12199,21 +12511,18 @@ namespace System.Reactive.Linq
                 )
             );
         }
-#endif
         
-#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
-        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
+        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12224,7 +12533,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12233,11 +12542,10 @@ namespace System.Reactive.Linq
                 )
             );
         }
-#endif
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task with cancellation support and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
@@ -12247,7 +12555,7 @@ namespace System.Reactive.Linq
         /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12258,7 +12566,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12271,7 +12579,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
@@ -12281,7 +12589,7 @@ namespace System.Reactive.Linq
         /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12292,7 +12600,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -12305,7 +12613,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -12317,7 +12625,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12330,7 +12638,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12344,7 +12652,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -12356,7 +12664,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12369,7 +12677,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12383,7 +12691,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -12395,7 +12703,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12408,7 +12716,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12422,7 +12730,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -12434,7 +12742,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12447,7 +12755,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -14923,6 +15231,232 @@ namespace System.Reactive.Linq
             );
         }
         
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in ascending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> is <see langword="null" />.</exception>
+        public static IOrderedQbservable<TSource> ThenBy<TSource, TKey>(this IOrderedQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.ThenBy<TSource, TKey>(default(IOrderedQbservable<TSource>), default(Expression<Func<TSource, TKey>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in ascending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="T:System.Collections.Generic.IComparer`1" /> to compare keys.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" />, <paramref name="keySelector" /> or <paramref name="comparer" /> is <see langword="null" />.</exception>
+        public static IOrderedQbservable<TSource> ThenBy<TSource, TKey>(this IOrderedQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.ThenBy<TSource, TKey>(default(IOrderedQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(IComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    Expression.Constant(comparer, typeof(IComparer<TKey>))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in ascending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector" />.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source" /> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="timeSelector" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// This overload of ThenBy is similar to a particular usage of SelectMany where the elements in the <paramref name="source" /> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <alert type="info">
+        /// Unlike the other overload of ThenBy, this overload does not buffer any elements and it does not wait for the <paramref name="source" /> sequence to complete before it 
+        /// pushes notifications.  This overload is entirely reactive.
+        /// </alert>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as a subsequent key.
+        /// </para>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> ThenBy<TSource, TOther>(this IOrderedQbservable<TSource> source, Expression<Func<TSource, IObservable<TOther>>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.ThenBy<TSource, TOther>(default(IOrderedQbservable<TSource>), default(Expression<Func<TSource, IObservable<TOther>>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TOther)),
+#endif
+                    source.Expression,
+                    timeSelector
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in descending order according to a key.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> is <see langword="null" />.</exception>
+        public static IOrderedQbservable<TSource> ThenByDescending<TSource, TKey>(this IOrderedQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.ThenByDescending<TSource, TKey>(default(IOrderedQbservable<TSource>), default(Expression<Func<TSource, TKey>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in descending order by using a specified comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the sorting key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="comparer">An <see cref="T:System.Collections.Generic.IComparer`1" /> to compare keys.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted in descending order according to a key.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" />, <paramref name="keySelector" /> or <paramref name="comparer" /> is <see langword="null" />.</exception>
+        public static IOrderedQbservable<TSource> ThenByDescending<TSource, TKey>(this IOrderedQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, IComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.ThenByDescending<TSource, TKey>(default(IOrderedQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(IComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    Expression.Constant(comparer, typeof(IComparer<TKey>))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Performs a subsequent ordering of the elements in a sequence in descending order according to the times at which corresponding observable sequences produce their first notification or complete.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the <paramref name="source" /> sequence.</typeparam>
+        /// <typeparam name="TOther">The type of the elements in the observable returned by <paramref name="timeSelector" />.</typeparam>
+        /// <param name="source">An observable sequence of values to order.</param>
+        /// <param name="timeSelector">A function that returns an observable for an element in the <paramref name="source" /> sequence indicating the time at which that element should appear in the ordering.</param>
+        /// <returns>An <see cref="T:System.Reactive.Linq.IOrderedObservable`1" /> whose elements are sorted in descending order according to the times at which corresponding observable sequences produce their first notification or complete.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="timeSelector" /> is <see langword="null" />.</exception>
+        /// <remarks>
+        /// <para>
+        /// Ordering descending by time requires all of the elements in the <paramref name="source" /> sequence to be observed before it can complete, 
+        /// which means that all of the elements must be buffered.  Ordering an observable sequence descending by time is similar to performing an aggregation 
+        /// such as <see cref="M:System.Reactive.Linq.Observable.ToList``1(System.IObservable{``0})" /> because timing from the <paramref name="source" /> sequence is lost and notifications are only generated when 
+        /// the <paramref name="source" /> sequence calls OnCompleted.
+        /// </para>
+        /// <alert type="warn">
+        /// Do not attempt to order an observable sequence that never calls OnCompleted.  There will be no notifications generated, and it 
+        /// may result in an <see cref="T:System.OutOfMemoryException" /> being thrown.
+        /// </alert>
+        /// <para>
+        /// This overload of ThenByDescending is similar to a particular usage of SelectMany where the elements in the <paramref name="source" /> sequence are returned based on the times 
+        /// at which their corresponding inner sequences produce their first notification or complete.  Any elements in the inner sequences are discarded.
+        /// </para>
+        /// <para>
+        /// This overload supports using the orderby LINQ query comprehension syntax in C# and Visual Basic by passing an observable sequence as a subsequent key.
+        /// </para>
+        /// </remarks>
+        public static IOrderedQbservable<TSource> ThenByDescending<TSource, TOther>(this IOrderedQbservable<TSource> source, Expression<Func<TSource, IObservable<TOther>>> timeSelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (timeSelector == null)
+                throw new ArgumentNullException("timeSelector");
+            
+            return (IOrderedQbservable<TSource>)source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.ThenByDescending<TSource, TOther>(default(IOrderedQbservable<TSource>), default(Expression<Func<TSource, IObservable<TOther>>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TOther)),
+#endif
+                    source.Expression,
+                    timeSelector
+                )
+            );
+        }
+        
         /// <summary>
         /// Ignores elements from an observable sequence which are followed by another element within a specified relative time duration.
         /// </summary>

+ 32 - 0
Rx.NET/Source/Tests.System.Reactive/Dummies/DummyOrderedObservable.cs

@@ -0,0 +1,32 @@
+// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
+
+using System;
+using System.Collections.Generic;
+using System.Reactive.Linq;
+
+namespace ReactiveTests.Dummies
+{
+    class DummyOrderedObservable<T> : IOrderedObservable<T>
+    {
+        public static readonly DummyOrderedObservable<T> Instance = new DummyOrderedObservable<T>();
+
+        DummyOrderedObservable()
+        {
+        }
+
+        public IDisposable Subscribe(IObserver<T> observer)
+        {
+            throw new NotImplementedException();
+        }
+
+        public IOrderedObservable<T> CreateOrderedObservable<TKey>(Func<T, TKey> keySelector, IComparer<TKey> comparer, bool descending)
+        {
+            return descending ? this.OrderByDescending(keySelector, comparer) : this.OrderBy(keySelector, comparer);
+        }
+
+        public IOrderedObservable<T> CreateOrderedObservable<TOther>(Func<T, IObservable<TOther>> timeSelector, bool descending)
+        {
+            return descending ? this.OrderByDescending(timeSelector) : this.OrderBy(timeSelector);
+        }
+    }
+}

+ 1 - 0
Rx.NET/Source/Tests.System.Reactive/Tests.System.Reactive.csproj

@@ -59,6 +59,7 @@
   <ItemGroup>
     <Compile Include="App.cs" />
     <Compile Include="DispatcherHelpers.cs" />
+    <Compile Include="Dummies\DummyOrderedObservable.cs" />
     <Compile Include="Semaphore.cs" />
     <Compile Include="Stress\Core\Disposables\Composite.cs" />
     <Compile Include="Stress\Core\Disposables\Serial.cs" />

+ 449 - 0
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs

@@ -10396,6 +10396,220 @@ namespace ReactiveTests.Tests
 
         #endregion
 
+        #region + OrderBy* +
+
+        [TestMethod]
+        public void OrderBy_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).OrderBy(DummyFunc<int, int>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderBy((Func<int, int>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderBy(DummyFunc<int, int>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void OrderBy_KeyComparer_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).OrderBy(DummyFunc<int, int>.Instance, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderBy((Func<int, int>)null, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderBy(DummyFunc<int, int>.Instance, null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderBy(DummyFunc<int, int>.Instance, Comparer<int>.Default).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void OrderBy_Time_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).OrderBy(DummyFunc<int, IObservable<int>>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderBy((Func<int, IObservable<int>>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderBy(DummyFunc<int, IObservable<int>>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void OrderByDescending_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).OrderByDescending(DummyFunc<int, int>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderByDescending((Func<int, int>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderByDescending(DummyFunc<int, int>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void OrderByDescending_KeyComparer_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).OrderByDescending(DummyFunc<int, int>.Instance, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderByDescending((Func<int, int>)null, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderByDescending(DummyFunc<int, int>.Instance, null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderByDescending(DummyFunc<int, int>.Instance, Comparer<int>.Default).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void OrderByDescending_Time_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IObservable<int>)null).OrderByDescending(DummyFunc<int, IObservable<int>>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderByDescending((Func<int, IObservable<int>>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.OrderByDescending(DummyFunc<int, IObservable<int>>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void OrderBy_Key()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateInterval(5, 1, TimeSpan.FromTicks(100), scheduler)
+                orderby x
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(501, 1),
+                OnNext(501, 2),
+                OnNext(501, 3),
+                OnNext(501, 4),
+                OnNext(501, 5),
+                OnCompleted<int>(501));
+        }
+
+        [TestMethod]
+        public void OrderBy_Key_WithKeyComparer()
+        {
+            var scheduler = new TestScheduler();
+            var comparer = new OrderByComparer(reverse: true);
+
+            var xs = CreateInterval(1, 5, TimeSpan.FromTicks(100), scheduler)
+                .OrderBy(x => x, comparer);
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(501, 5),
+                OnNext(501, 4),
+                OnNext(501, 3),
+                OnNext(501, 2),
+                OnNext(501, 1),
+                OnCompleted<int>(501));
+        }
+
+        [TestMethod]
+        public void OrderByDescending_Key()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateInterval(1, 5, TimeSpan.FromTicks(100), scheduler)
+                orderby x descending
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(501, 5),
+                OnNext(501, 4),
+                OnNext(501, 3),
+                OnNext(501, 2),
+                OnNext(501, 1),
+                OnCompleted<int>(501));
+        }
+
+        [TestMethod]
+        public void OrderByDescending_Key_WithKeyComparer()
+        {
+            var scheduler = new TestScheduler();
+            var comparer = new OrderByComparer(reverse: true);
+
+            var xs = CreateInterval(1, 5, TimeSpan.FromTicks(100), scheduler)
+                .OrderByDescending(x => x, comparer);
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(501, 1),
+                OnNext(501, 2),
+                OnNext(501, 3),
+                OnNext(501, 4),
+                OnNext(501, 5),
+                OnCompleted<int>(501));
+        }
+
+        [TestMethod]
+        public void OrderBy_Time()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateRange(1, 5, TimeSpan.FromTicks(100), scheduler)
+                orderby Observable.Timer(TimeSpan.FromTicks(100 * (5 - x)), scheduler)
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(102, 5),
+                OnNext(201, 4),
+                OnNext(301, 3),
+                OnNext(401, 2),
+                OnNext(501, 1),
+                OnCompleted<int>(501));
+        }
+
+        [TestMethod]
+        public void OrderByDescending_Time()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateRange(1, 5, TimeSpan.FromTicks(100), scheduler)
+                orderby Observable.Timer(TimeSpan.FromTicks(100 * (5 - x)), scheduler) descending
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(501, 1),
+                OnNext(501, 2),
+                OnNext(501, 3),
+                OnNext(501, 4),
+                OnNext(501, 5),
+                OnCompleted<int>(501));
+        }
+
+        class OrderByComparer : IComparer<int>
+        {
+            private readonly bool _reverse;
+
+            public OrderByComparer(bool reverse)
+            {
+                _reverse = reverse;
+            }
+
+            public int Compare(int x, int y)
+            {
+                return _reverse ? y.CompareTo(x) : x.CompareTo(y);
+            }
+        }
+
+        static IObservable<int> CreateInterval(int start, int end, TimeSpan period, TestScheduler scheduler)
+        {
+            var ticks = 0L;
+            var range = end >= start
+                    ? Enumerable.Range(start, end - start + 1)
+                    : Enumerable.Range(end, start - end + 1).Reverse();
+
+            return scheduler.CreateColdObservable(
+                range.Select(i => OnNext(ticks += period.Ticks, i))
+                    .Concat(DeferEnumerable(() => new[] { OnCompleted<int>(ticks) }))
+                    .ToArray());
+        }
+
+        static IObservable<int> CreateRange(int start, int end, TimeSpan startTime, TestScheduler scheduler)
+        {
+            var range = end >= start
+                    ? Enumerable.Range(start, end - start + 1)
+                    : Enumerable.Range(end, start - end + 1).Reverse();
+
+            return scheduler.CreateColdObservable(
+                range.Select(i => OnNext(startTime.Ticks, i))
+                    .Concat(new[] { OnCompleted<int>(startTime.Ticks) })
+                    .ToArray());
+        }
+
+        static IEnumerable<T> DeferEnumerable<T>(Func<IEnumerable<T>> factory)
+        {
+            foreach (var value in factory())
+            {
+                yield return value;
+            }
+        }
+
+        #endregion
+
         #region + Select +
 
         [TestMethod]
@@ -19894,6 +20108,241 @@ namespace ReactiveTests.Tests
 
         #endregion
 
+        #region + ThenBy* +
+
+        [TestMethod]
+        public void ThenBy_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IOrderedObservable<int>)null).ThenBy(DummyFunc<int, int>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenBy((Func<int, int>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenBy(DummyFunc<int, int>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void ThenBy_KeyComparer_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IOrderedObservable<int>)null).ThenBy(DummyFunc<int, int>.Instance, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenBy((Func<int, int>)null, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenBy(DummyFunc<int, int>.Instance, null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenBy(DummyFunc<int, int>.Instance, Comparer<int>.Default).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void ThenBy_Time_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IOrderedObservable<int>)null).ThenBy(DummyFunc<int, IObservable<int>>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenBy((Func<int, IObservable<int>>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenBy(DummyFunc<int, IObservable<int>>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void ThenByDescending_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IOrderedObservable<int>)null).ThenByDescending(DummyFunc<int, int>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenByDescending((Func<int, int>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenByDescending(DummyFunc<int, int>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void ThenByDescending_KeyComparer_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IOrderedObservable<int>)null).ThenByDescending(DummyFunc<int, int>.Instance, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenByDescending((Func<int, int>)null, Comparer<int>.Default));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenByDescending(DummyFunc<int, int>.Instance, null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenByDescending(DummyFunc<int, int>.Instance, Comparer<int>.Default).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void ThenByDescending_Time_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => ((IOrderedObservable<int>)null).ThenByDescending(DummyFunc<int, IObservable<int>>.Instance));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenByDescending((Func<int, IObservable<int>>)null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyOrderedObservable<int>.Instance.ThenByDescending(DummyFunc<int, IObservable<int>>.Instance).Subscribe(null));
+        }
+
+        [TestMethod]
+        public void OrderBy_Key_ThenBy_Key()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateInterval(3, 1, TimeSpan.FromTicks(100), scheduler)
+                from y in CreateInterval(8, 6, TimeSpan.FromTicks(100), scheduler)
+                orderby x, y
+                select x * y;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(601, 6),
+                OnNext(601, 7),
+                OnNext(601, 8),
+                OnNext(601, 12),
+                OnNext(601, 14),
+                OnNext(601, 16),
+                OnNext(601, 18),
+                OnNext(601, 21),
+                OnNext(601, 24),
+                OnCompleted<int>(601));
+        }
+
+        [TestMethod]
+        public void OrderBy_Key_ThenBy_Key_WithKeyComparer()
+        {
+            var scheduler = new TestScheduler();
+            var comparer = new OrderByComparer(reverse: true);
+
+            var xs =
+                (from x in CreateInterval(3, 1, TimeSpan.FromTicks(100), scheduler)
+                 from y in CreateInterval(6, 8, TimeSpan.FromTicks(100), scheduler)
+                 select new { x, y })
+                 .OrderBy(pair => pair.x)
+                 .ThenBy(pair => pair.y, comparer)
+                 .Select(pair => pair.x * pair.y);
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(601, 8),
+                OnNext(601, 7),
+                OnNext(601, 6),
+                OnNext(601, 16),
+                OnNext(601, 14),
+                OnNext(601, 12),
+                OnNext(601, 24),
+                OnNext(601, 21),
+                OnNext(601, 18),
+                OnCompleted<int>(601));
+        }
+
+        [TestMethod]
+        public void OrderBy_Key_ThenByDescending_Key()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateInterval(3, 1, TimeSpan.FromTicks(100), scheduler)
+                from y in CreateInterval(6, 8, TimeSpan.FromTicks(100), scheduler)
+                orderby x, y descending
+                select x * y;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(601, 8),
+                OnNext(601, 7),
+                OnNext(601, 6),
+                OnNext(601, 16),
+                OnNext(601, 14),
+                OnNext(601, 12),
+                OnNext(601, 24),
+                OnNext(601, 21),
+                OnNext(601, 18),
+                OnCompleted<int>(601));
+        }
+
+        [TestMethod]
+        public void OrderBy_Key_ThenByDescending_Key_WithKeyComparer()
+        {
+            var scheduler = new TestScheduler();
+            var comparer = new OrderByComparer(reverse: true);
+
+            var xs =
+                (from x in CreateInterval(3, 1, TimeSpan.FromTicks(100), scheduler)
+                 from y in CreateInterval(6, 8, TimeSpan.FromTicks(100), scheduler)
+                 select new { x, y })
+                 .OrderBy(pair => pair.x)
+                 .ThenByDescending(pair => pair.y, comparer)
+                 .Select(pair => pair.x * pair.y);
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(601, 6),
+                OnNext(601, 7),
+                OnNext(601, 8),
+                OnNext(601, 12),
+                OnNext(601, 14),
+                OnNext(601, 16),
+                OnNext(601, 18),
+                OnNext(601, 21),
+                OnNext(601, 24),
+                OnCompleted<int>(601));
+        }
+
+        [TestMethod]
+        public void OrderBy_Time_ThenBy_Time()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateRange(1, 5, TimeSpan.FromTicks(100), scheduler)
+                orderby Observable.Timer(TimeSpan.FromTicks(100 * (5 - x)), scheduler),
+                        Observable.Timer(TimeSpan.FromTicks(x == 3 ? 300 : 50), scheduler)
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(152, 5),
+                OnNext(251, 4),
+                OnNext(451, 2),
+                OnNext(551, 1),
+                OnNext(601, 3),
+                OnCompleted<int>(601));
+        }
+
+        [TestMethod]
+        public void OrderBy_Time_ThenByDescending_Time()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateRange(1, 5, TimeSpan.FromTicks(100), scheduler)
+                orderby Observable.Timer(TimeSpan.FromTicks(100 * (5 - x)), scheduler),
+                        Observable.Timer(TimeSpan.FromTicks(x == 3 ? 300 : 50), scheduler) descending
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(601, 3),
+                OnNext(601, 1),
+                OnNext(601, 2),
+                OnNext(601, 4),
+                OnNext(601, 5),
+                OnCompleted<int>(601));
+        }
+
+        [TestMethod]
+        public void OrderBy_Time_ThenByDescending_Key()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateRange(1, 5, TimeSpan.FromTicks(100), scheduler)
+                orderby Observable.Timer(TimeSpan.FromTicks(100 * (5 - x)), scheduler),
+                        x descending
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(501, 5),
+                OnNext(501, 4),
+                OnNext(501, 3),
+                OnNext(501, 2),
+                OnNext(501, 1),
+                OnCompleted<int>(501));
+        }
+
+        [TestMethod]
+        public void OrderByDescending_Key_ThenBy_Time()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs =
+                from x in CreateInterval(1, 5, TimeSpan.FromTicks(100), scheduler)
+                orderby x descending, Observable.Timer(TimeSpan.FromTicks(50 * x), scheduler)
+                select x;
+
+            scheduler.Start(() => xs, 0, 0, 1000).Messages.AssertEqual(
+                OnNext(551, 1),
+                OnNext(601, 2),
+                OnNext(651, 3),
+                OnNext(701, 4),
+                OnNext(751, 5),
+                OnCompleted<int>(751));
+        }
+
+        #endregion
+
         #region + Where +
 
         [TestMethod]

+ 3 - 1
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/QbservableTest.cs

@@ -1790,7 +1790,7 @@ namespace ReactiveTests.Tests
             var pars = string.Join(", ", pss.Select(p => (Attribute.IsDefined(p, typeof(ParamArrayAttribute)) ? "params " : "") + GetTypeName(p.ParameterType, correct) + " " + p.Name).ToArray());
             if (Attribute.IsDefined(m, typeof(ExtensionAttribute)))
             {
-                if (pars.StartsWith("IQbservable") || pars.StartsWith("IQueryable"))
+                if (pars.StartsWith("IQbservable") || pars.StartsWith("IOrderedQbservable") || pars.StartsWith("IQueryable"))
                     pars = "this " + pars;
             }
 
@@ -1811,6 +1811,8 @@ namespace ReactiveTests.Tests
                 var name = len >= 0 ? t.Name.Substring(0, len) : t.Name;
                 if (correct && name == "IQbservable")
                     name = "IObservable";
+                if (correct && name == "IOrderedQbservable")
+                    name = "IOrderedObservable";
                 if (correct && name == "IQueryable")
                     name = "IEnumerable";
 

+ 19 - 6
Rx.NET/tools/HomoIcon/Program.cs

@@ -29,7 +29,7 @@ namespace HomoIconize
             Process(root, "System.Reactive.Linq", "System.Reactive.Providers", @"Reactive\Linq\Qbservable.Generated.cs", "System.Reactive.Linq.Observable", "Qbservable", true);
             Console.WriteLine();
 
-            Process(root, "System.Reactive.Experimental", "System.Reactive.Experimental", @"Reactive\Linq\QbservableEx.Generated.cs", "System.Reactive.Linq.ObservableEx", "QbservableEx");            
+            Process(root, "System.Reactive.Experimental", "System.Reactive.Experimental", @"Reactive\Linq\QbservableEx.Generated.cs", "System.Reactive.Linq.ObservableEx", "QbservableEx");
             Console.WriteLine();
         }
 
@@ -79,6 +79,11 @@ namespace HomoIconize
         {
         }
 
+        // Prototype interface to break dependencies. Only used for ToString2 ultimately.
+        interface IOrderedQbservable<T>
+        {
+        }
+
         static void Generate(string input, string xml, string output, string sourceTypeName, string targetTypeName, bool includeAsync)
         {
             var docs = XDocument.Load(xml).Root.Element("members").Elements("member").ToDictionary(m => m.Attribute("name").Value, m => m);
@@ -88,6 +93,7 @@ namespace HomoIconize
             var asm = Assembly.LoadFrom(input);
             var t = asm.GetType(sourceTypeName);
             _qbs = typeof(IQbservable<>); //asm.GetType("System.Reactive.Linq.IQbservable`1");
+            _oqbs = typeof(IOrderedQbservable<>);
 
             Console.WriteLine("Checking {0}...", output);
             var attr = File.GetAttributes(output);
@@ -132,7 +138,8 @@ namespace HomoIconize
             }
         }
 
-        static Type _qbs;
+        private const string OrderedObservableFullName = "System.Reactive.Linq.IOrderedObservable`1";
+        static Type _qbs, _oqbs;
 
         static void Generate(Type t, IDictionary<string, XElement> docs, string typeName, bool includeAsync)
         {
@@ -214,7 +221,7 @@ using System.Runtime.Remoting.Lifetime;
                     var d = ret.GetGenericTypeDefinition();
                     if (d.Name.StartsWith("IConnectableObservable") || d.Name.StartsWith("ListObservable"))
                         continue;
-                    if (d != typeof(IObservable<>) && d != typeof(IEnumerable<>))
+                    if (d != typeof(IObservable<>) && d != typeof(IEnumerable<>) && d.FullName != OrderedObservableFullName)
                         throw new InvalidOperationException("Invalid return type for " + m.Name);
                 }
                 else
@@ -229,7 +236,7 @@ using System.Runtime.Remoting.Lifetime;
                     if (f.ParameterType.IsGenericType)
                     {
                         var d = f.ParameterType.GetGenericTypeDefinition();
-                        if (d == typeof(IObservable<>)) // Check - e.g. Amb    || d == typeof(IEnumerable<>))
+                        if (d == typeof(IObservable<>) || d.FullName == OrderedObservableFullName) // Check - e.g. Amb    || d == typeof(IEnumerable<>))
                             hasProvider = false;
                     }
                 }
@@ -265,7 +272,7 @@ using System.Runtime.Remoting.Lifetime;
                     }
                     else
                     {
-                        var isObs = new Func<Type, bool>(tt => tt.IsGenericType && tt.GetGenericTypeDefinition() == typeof(IObservable<>));
+                        var isObs = new Func<Type, bool>(tt => tt.IsGenericType && (tt.GetGenericTypeDefinition() == typeof(IObservable<>) || tt.FullName == OrderedObservableFullName));
                         var isEnm = new Func<Type, bool>(tt => tt.IsGenericType && tt.GetGenericTypeDefinition() == typeof(IEnumerable<>));
                         if (isObs(pt) || pt.IsArray && isObs(pt.GetElementType()) || isEnm(pt) || pt.IsArray && isEnm(pt.GetElementType()))
                             args.Add("GetSourceExpression(" + q.Name + ")");
@@ -288,7 +295,9 @@ using System.Runtime.Remoting.Lifetime;
                 var requiresQueryProvider = ret.GetGenericTypeDefinition() == typeof(IQueryable<>);
                 if (requiresQueryProvider)
                     factory = "((IQueryProvider)" + factory + ")";
-                
+                else if (!ret.IsGenericType || ret.GetGenericTypeDefinition() != _qbs)
+                    factory = "(" + ret.ToString2() + ")" + factory;
+
                 var genArgs = m.GetGenericArguments().Select(a => a.ToString2()).ToList();
                 var g = genArgs.Count > 0 ? "<" + string.Join(", ", genArgs) + ">" : "";
                 var name = m.Name;
@@ -812,6 +821,10 @@ using System.Runtime.Remoting.Lifetime;
                 {
                     return _qbs.MakeGenericType(type.GetGenericArguments());
                 }
+                else if (g.FullName == OrderedObservableFullName)
+                {
+                    return _oqbs.MakeGenericType(type.GetGenericArguments());
+                }
                 else if (g == typeof(IEnumerable<>))
                 {
                     return typeof(IQueryable<>).MakeGenericType(type.GetGenericArguments());

BIN
Rx.NET/tools/HomoIcon/bin/Debug/HomoIcon.exe


BIN
Rx.NET/tools/HomoIcon/bin/Debug/HomoIcon.pdb