浏览代码

Merged commit e00c2a7 - pull request from davedev: Added GroupBy and GroupByUntil overloads with a capacity parameter.

Donna Malayeri 12 年之前
父节点
当前提交
60f15465e2

+ 2 - 0
Rx.NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs

@@ -99,6 +99,8 @@ namespace System.Collections.Concurrent
 
         public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { }
 
+        public ConcurrentDictionary(int capacity, IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, capacity, true, comparer) { }
+
         internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
         {
             if (concurrencyLevel < 1)

+ 1 - 1
Rx.NET/Source/System.Reactive.Experimental/Reactive/Linq/QbservableEx.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (11/1/2013 5:54:10 PM)
+ * WARNING: Auto-generated file (11/4/2013 10:47:23 AM)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 

+ 11 - 3
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs

@@ -385,7 +385,7 @@ namespace System.Reactive.Linq
         #endregion
 
         #region * Conversions *
-        
+
         IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer);
         IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler);
         IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source);
@@ -502,7 +502,7 @@ namespace System.Reactive.Linq
         IObservable<TEventArgs> FromEvent<TEventArgs>(Action<Action<TEventArgs>> addHandler, Action<Action<TEventArgs>> removeHandler, IScheduler scheduler);
         IObservable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler);
         IObservable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler, IScheduler scheduler);
-        
+
         #endregion
 
         #region * Imperative *
@@ -682,10 +682,18 @@ namespace System.Reactive.Linq
         IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector);
         IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector);
-        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector,Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity);
         IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector);
         IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector);
         IObservable<TResult> OfType<TResult>(IObservable<object> source);

+ 249 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs

@@ -241,6 +241,112 @@ namespace System.Reactive.Linq
             return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
         }
 
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity, comparer);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity and selects the resulting elements by using a specified function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
+        }
+
         #endregion
 
         #region + GroupByUntil +
@@ -329,7 +435,7 @@ namespace System.Reactive.Linq
         /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
         /// </returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
-        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector,Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -371,6 +477,148 @@ namespace System.Reactive.Linq
             return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector);
         }
 
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and selects the resulting elements by using a specified function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity, comparer);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity);
+        }
+
         #endregion
 
         #region + GroupJoin +

+ 12 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs

@@ -14,13 +14,15 @@ namespace System.Reactive.Linq.ObservableImpl
         private readonly IObservable<TSource> _source;
         private readonly Func<TSource, TKey> _keySelector;
         private readonly Func<TSource, TElement> _elementSelector;
+        private readonly int? _capacity;
         private readonly IEqualityComparer<TKey> _comparer;
 
-        public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+        public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
             _source = source;
             _keySelector = keySelector;
             _elementSelector = elementSelector;
+            _capacity = capacity;
             _comparer = comparer;
         }
 
@@ -49,7 +51,15 @@ namespace System.Reactive.Linq.ObservableImpl
                 : base(observer, cancel)
             {
                 _parent = parent;
-                _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+
+                if (_parent._capacity.HasValue)
+                {
+                    _map = new Dictionary<TKey, ISubject<TElement>>(_parent._capacity.Value, _parent._comparer);
+                }
+                else
+                {
+                    _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+                }
             }
 
             public void OnNext(TSource value)

+ 43 - 7
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs

@@ -1,6 +1,6 @@
 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 
- #if !NO_PERF
+#if !NO_PERF
 using System;
 using System.Collections.Generic;
 using System.Linq;
@@ -15,14 +15,16 @@ namespace System.Reactive.Linq.ObservableImpl
         private readonly Func<TSource, TKey> _keySelector;
         private readonly Func<TSource, TElement> _elementSelector;
         private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector;
+        private readonly int? _capacity;
         private readonly IEqualityComparer<TKey> _comparer;
 
-        public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+        public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
             _source = source;
             _keySelector = keySelector;
             _elementSelector = elementSelector;
             _durationSelector = durationSelector;
+            _capacity = capacity;
             _comparer = comparer;
         }
 
@@ -52,7 +54,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 : base(observer, cancel)
             {
                 _parent = parent;
-                _map = new Map<TKey, ISubject<TElement>>(_parent._comparer);
+                _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer);
                 _nullGate = new object();
             }
 
@@ -272,11 +274,38 @@ namespace System.Reactive.Linq.ObservableImpl
 #if !NO_CDS
     class Map<TKey, TValue>
     {
+#if !NO_CDS_COLLECTIONS
+        // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs
+
+        // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the
+        // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference
+        // and blocking, but also the more expensive operations that require all locks become (e.g. table
+        // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good
+        // compromise.
+        private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
+
+        private static int DefaultConcurrencyLevel
+        {
+            get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
+        }
+#endif
+
         private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map;
 
-        public Map(IEqualityComparer<TKey> comparer)
+        public Map(int? capacity, IEqualityComparer<TKey> comparer)
         {
-            _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+            if (capacity.HasValue)
+            {
+#if NO_CDS_COLLECTIONS
+                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer);
+#else
+                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
+#endif
+            }
+            else
+            {
+                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+            }
         }
 
         public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
@@ -327,9 +356,16 @@ namespace System.Reactive.Linq.ObservableImpl
     {
         private readonly Dictionary<TKey, TValue> _map;
 
-        public Map(IEqualityComparer<TKey> comparer)
+        public Map(int? capacity, IEqualityComparer<TKey> comparer)
         {
-            _map = new Dictionary<TKey, TValue>(comparer);
+            if (capacity.HasValue)
+            {
+                _map = new Dictionary<TKey, TValue>(capacity.Value, comparer);
+            }
+            else
+            {
+                _map = new Dictionary<TKey, TValue>(comparer);
+            }
         }
 
         public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)

+ 56 - 14
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs

@@ -153,30 +153,50 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
         {
-            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, comparer);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
         {
-            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, EqualityComparer<TKey>.Default);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
         }
 
-        private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+        {
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, comparer);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+        {
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
+        }
+
+        private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
 #if !NO_PERF
-            return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+            return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
 #else
-            return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), comparer);
+            return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), capacity, comparer);
 #endif
         }
 
@@ -186,32 +206,54 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, comparer);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
         {
-            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, EqualityComparer<TKey>.Default);
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, comparer);
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, comparer);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
         {
-            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, EqualityComparer<TKey>.Default);
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, comparer);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default);
         }
 
-        private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+        private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
 #if !NO_PERF
-            return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+            return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
 #else
             return new AnonymousObservable<IGroupedObservable<TKey, TElement>>(observer =>
             {
-                var map = new Dictionary<TKey, ISubject<TElement>>(comparer);
+                var map = capacity.HasValue
+                        ? new Dictionary<TKey, ISubject<TElement>>(capacity.Value, comparer)
+                        : new Dictionary<TKey, ISubject<TElement>>(comparer);
 
                 var groupDisposable = new CompositeDisposable();
                 var refCountDisposable = new RefCountDisposable(groupDisposable);

+ 429 - 73
Rx.NET/Source/System.Reactive.Providers/Reactive/Linq/Qbservable.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (11/1/2013 5:54:10 PM)
+ * WARNING: Auto-generated file (11/4/2013 10:47:23 AM)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 
@@ -7340,6 +7340,80 @@ namespace System.Reactive.Linq
             );
         }
         
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TSource>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupBy<TSource, TKey>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(int))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    Expression.Constant(capacity, typeof(int))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> or <paramref name="comparer" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TSource>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupBy<TSource, TKey>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(int), default(IEqualityComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    Expression.Constant(capacity, typeof(int)),
+                    Expression.Constant(comparer, typeof(IEqualityComparer<TKey>))
+                )
+            );
+        }
+        
         /// <summary>
         /// Groups the elements of an observable sequence according to a specified key selector function and comparer.
         /// </summary>
@@ -7411,6 +7485,90 @@ namespace System.Reactive.Linq
             );
         }
         
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity and selects the resulting elements by using a specified function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> or <paramref name="elementSelector" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, Expression<Func<TSource, TElement>> elementSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TElement>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupBy<TSource, TKey, TElement>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(Expression<Func<TSource, TElement>>), default(int))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TElement)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    elementSelector,
+                    Expression.Constant(capacity, typeof(int))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> or <paramref name="elementSelector" /> or <paramref name="comparer" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, Expression<Func<TSource, TElement>> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TElement>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupBy<TSource, TKey, TElement>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(Expression<Func<TSource, TElement>>), default(int), default(IEqualityComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TElement)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    elementSelector,
+                    Expression.Constant(capacity, typeof(int)),
+                    Expression.Constant(comparer, typeof(IEqualityComparer<TKey>))
+                )
+            );
+        }
+        
         /// <summary>
         /// Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
         /// </summary>
@@ -7492,6 +7650,100 @@ namespace System.Reactive.Linq
             );
         }
         
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> or <paramref name="durationSelector" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, Expression<Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>>> durationSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TSource>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupByUntil<TSource, TKey, TDuration>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(Expression<Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>>>), default(int))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TDuration)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    durationSelector,
+                    Expression.Constant(capacity, typeof(int))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> or <paramref name="durationSelector" /> or <paramref name="comparer" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, Expression<Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TSource>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupByUntil<TSource, TKey, TDuration>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(Expression<Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>>>), default(int), default(IEqualityComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TDuration)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    durationSelector,
+                    Expression.Constant(capacity, typeof(int)),
+                    Expression.Constant(comparer, typeof(IEqualityComparer<TKey>))
+                )
+            );
+        }
+        
         /// <summary>
         /// Groups the elements of an observable sequence according to a specified key selector function and comparer.
         /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
@@ -7583,6 +7835,110 @@ namespace System.Reactive.Linq
             );
         }
         
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and selects the resulting elements by using a specified function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> or <paramref name="elementSelector" /> or <paramref name="durationSelector" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, Expression<Func<TSource, TElement>> elementSelector, Expression<Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>>> durationSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TElement>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupByUntil<TSource, TKey, TElement, TDuration>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(Expression<Func<TSource, TElement>>), default(Expression<Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>>>), default(int))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TElement), typeof(TDuration)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    elementSelector,
+                    durationSelector,
+                    Expression.Constant(capacity, typeof(int))
+                )
+            );
+        }
+        
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
+        /// </returns>
+        /// <exception cref="T:System.ArgumentNullException">
+        /// <paramref name="source" /> or <paramref name="keySelector" /> or <paramref name="elementSelector" /> or <paramref name="durationSelector" /> or <paramref name="comparer" /> is null.</exception>
+        /// <exception cref="T:System.ArgumentOutOfRangeException">
+        /// <paramref name="capacity" /> is less than 0.</exception>
+        public static IQbservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IQbservable<TSource> source, Expression<Func<TSource, TKey>> keySelector, Expression<Func<TSource, TElement>> elementSelector, Expression<Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+            
+            return source.Provider.CreateQuery<IGroupedObservable<TKey, TElement>>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.GroupByUntil<TSource, TKey, TElement, TDuration>(default(IQbservable<TSource>), default(Expression<Func<TSource, TKey>>), default(Expression<Func<TSource, TElement>>), default(Expression<Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>>>), default(int), default(IEqualityComparer<TKey>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey), typeof(TElement), typeof(TDuration)),
+#endif
+                    source.Expression,
+                    keySelector,
+                    elementSelector,
+                    durationSelector,
+                    Expression.Constant(capacity, typeof(int)),
+                    Expression.Constant(comparer, typeof(IEqualityComparer<TKey>))
+                )
+            );
+        }
+        
         /// <summary>
         /// Groups the elements of an observable sequence according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
         /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
@@ -11431,10 +11787,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element.</param>
@@ -11442,7 +11798,8 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11455,7 +11812,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -11467,10 +11824,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
@@ -11478,7 +11835,8 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11491,7 +11849,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -11503,10 +11861,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element.</param>
@@ -11514,8 +11872,7 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, TCollection, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11528,7 +11885,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TCollection>>>), default(Expression<Func<TSource, TCollection, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -11540,10 +11897,10 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index, invokes the result selector for the source element and each of the corresponding inner sequence's elements, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TCollection">The type of the elements in the projected intermediate enumerable sequences.</typeparam>
+        /// <typeparam name="TCollection">The type of the elements in the projected intermediate sequences.</typeparam>
         /// <typeparam name="TResult">The type of the elements in the result sequence, obtained by using the selector to combine source sequence elements with their corresponding intermediate sequence elements.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="collectionSelector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
@@ -11551,8 +11908,7 @@ namespace System.Reactive.Linq
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function collectionSelector on each element of the input sequence and then mapping each of those sequence elements and their corresponding source element to a result element.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="collectionSelector" /> or <paramref name="resultSelector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TCollection, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TCollection>>> collectionSelector, Expression<Func<TSource, int, TCollection, int, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11565,7 +11921,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TCollection, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TCollection>>>), default(Expression<Func<TSource, int, TCollection, int, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TCollection), typeof(TResult)),
 #endif
@@ -11686,16 +12042,17 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TResult>>> selector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11706,7 +12063,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11717,16 +12074,17 @@ namespace System.Reactive.Linq
         }
         
         /// <summary>
-        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index and concatenates the resulting enumerable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
         /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TResult>>> selector)
+        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11737,7 +12095,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11747,19 +12105,17 @@ namespace System.Reactive.Linq
             );
         }
         
-#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
-        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
+        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IObservable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11770,7 +12126,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IObservable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11779,21 +12135,18 @@ namespace System.Reactive.Linq
                 )
             );
         }
-#endif
         
-#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to an observable sequence by incorporating the element's index and merges the resulting observable sequences into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the elements in the projected inner sequences and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
-        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
-        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
+        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IObservable<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11804,7 +12157,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IObservable<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11813,11 +12166,10 @@ namespace System.Reactive.Linq
                 )
             );
         }
-#endif
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task with cancellation support and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to a task and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
@@ -11827,7 +12179,7 @@ namespace System.Reactive.Linq
         /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11838,7 +12190,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11851,7 +12203,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support and merges all of the task results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
@@ -11861,7 +12213,7 @@ namespace System.Reactive.Linq
         /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11872,7 +12224,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11883,18 +12235,19 @@ namespace System.Reactive.Linq
         }
 #endif
         
+#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence and concatenates the resulting enumerable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to a task with cancellation support and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element.</param>
-        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, IEnumerable<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11905,7 +12258,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, IEnumerable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11914,19 +12267,21 @@ namespace System.Reactive.Linq
                 )
             );
         }
+#endif
         
+#if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to an enumerable sequence by incorporating the element's index and concatenates the resulting enumerable sequences into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support and merges all of the task results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
-        /// <typeparam name="TResult">The type of the elements in the projected inner enumerable sequences and the elements in the merged result sequence.</typeparam>
+        /// <typeparam name="TResult">The type of the result produced by the projected tasks and the elements in the merged result sequence.</typeparam>
         /// <param name="source">An observable sequence of elements to project.</param>
         /// <param name="selector">A transform function to apply to each element; the second parameter of the function represents the index of the source element.</param>
-        /// <returns>An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.</returns>
+        /// <returns>An observable sequence whose elements are the result of the tasks executed for each element of the input sequence.</returns>
+        /// <remarks>This overload supports composition of observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="selector" /> is null.</exception>
-        /// <remarks>The projected sequences are enumerated synchonously within the OnNext call of the source sequence. In order to do a concurrent, non-blocking merge, change the selector to return an observable sequence obtained using the <see cref="M:System.Reactive.Linq.Observable.ToObservable``1(System.Collections.Generic.IEnumerable{``0})" /> conversion.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, IEnumerable<TResult>>> selector)
+        public static IQbservable<TResult> SelectMany<TSource, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TResult>>> selector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11937,7 +12292,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, IEnumerable<TResult>>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TResult>>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TResult)),
 #endif
@@ -11946,10 +12301,11 @@ namespace System.Reactive.Linq
                 )
             );
         }
+#endif
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -11961,7 +12317,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -11974,7 +12330,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -11988,7 +12344,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -12000,7 +12356,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12013,7 +12369,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12027,7 +12383,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -12039,7 +12395,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12052,7 +12408,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, Task<TTaskResult>>>), default(Expression<Func<TSource, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif
@@ -12066,7 +12422,7 @@ namespace System.Reactive.Linq
         
 #if !NO_TPL
         /// <summary>
-        /// Projects each element of an observable sequence to a task by incorporating the element's index with cancellation support, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
+        /// Projects each element of an observable sequence to a task by incorporating the element's index, invokes the result selector for the source element and the task result, and merges the results into one observable sequence.
         /// </summary>
         /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
         /// <typeparam name="TTaskResult">The type of the results produced by the projected intermediate tasks.</typeparam>
@@ -12078,7 +12434,7 @@ namespace System.Reactive.Linq
         /// <exception cref="T:System.ArgumentNullException">
         /// <paramref name="source" /> or <paramref name="taskSelector" /> or <paramref name="resultSelector" /> is null.</exception>
         /// <remarks>This overload supports using LINQ query comprehension syntax in C# and Visual Basic to compose observable sequences and tasks, without requiring manual conversion of the tasks to observable sequences using <see cref="M:System.Reactive.Threading.Tasks.TaskObservableExtensions.ToObservable``1(System.Threading.Tasks.Task{``0})" />.</remarks>
-        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
+        public static IQbservable<TResult> SelectMany<TSource, TTaskResult, TResult>(this IQbservable<TSource> source, Expression<Func<TSource, int, Task<TTaskResult>>> taskSelector, Expression<Func<TSource, int, TTaskResult, TResult>> resultSelector)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -12091,7 +12447,7 @@ namespace System.Reactive.Linq
                 Expression.Call(
                     null,
 #if CRIPPLED_REFLECTION
-                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, CancellationToken, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
+                    InfoOf(() => Qbservable.SelectMany<TSource, TTaskResult, TResult>(default(IQbservable<TSource>), default(Expression<Func<TSource, int, Task<TTaskResult>>>), default(Expression<Func<TSource, int, TTaskResult, TResult>>))),
 #else
                     ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TTaskResult), typeof(TResult)),
 #endif

文件差异内容过多而无法显示
+ 3506 - 4
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs


部分文件因为文件数量过多而无法显示