瀏覽代碼

Added GroupBy and GroupByUntil overloads with a capacity parameter.

davedev 12 年之前
父節點
當前提交
e00c2a74bf

+ 2 - 1
Rx.NET/Source/System.Reactive.Core/Reactive/Internal/ConcurrentDictionary.cs

@@ -8,7 +8,6 @@
 
 #if NO_CDS_COLLECTIONS
 
-using System;
 using System.Collections.Generic;
 using System.Collections.ObjectModel;
 using System.Diagnostics.CodeAnalysis;
@@ -99,6 +98,8 @@ namespace System.Collections.Concurrent
 
         public ConcurrentDictionary(IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, DEFAULT_CAPACITY, true, comparer) { }
 
+        public ConcurrentDictionary(int capacity, IEqualityComparer<TKey> comparer) : this(DefaultConcurrencyLevel, capacity, true, comparer) { }
+
         internal ConcurrentDictionary(int concurrencyLevel, int capacity, bool growLockArray, IEqualityComparer<TKey> comparer)
         {
             if (concurrencyLevel < 1)

+ 11 - 3
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/IQueryLanguage.cs

@@ -385,7 +385,7 @@ namespace System.Reactive.Linq
         #endregion
 
         #region * Conversions *
-        
+
         IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer);
         IDisposable Subscribe<TSource>(IEnumerable<TSource> source, IObserver<TSource> observer, IScheduler scheduler);
         IEnumerable<TSource> ToEnumerable<TSource>(IObservable<TSource> source);
@@ -502,7 +502,7 @@ namespace System.Reactive.Linq
         IObservable<TEventArgs> FromEvent<TEventArgs>(Action<Action<TEventArgs>> addHandler, Action<Action<TEventArgs>> removeHandler, IScheduler scheduler);
         IObservable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler);
         IObservable<Unit> FromEvent(Action<Action> addHandler, Action<Action> removeHandler, IScheduler scheduler);
-        
+
         #endregion
 
         #region * Imperative *
@@ -682,10 +682,18 @@ namespace System.Reactive.Linq
         IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector);
         IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector);
-        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector,Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer);
         IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer);
+        IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity);
         IObservable<TResult> GroupJoin<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, IObservable<TRight>, TResult> resultSelector);
         IObservable<TResult> Join<TLeft, TRight, TLeftDuration, TRightDuration, TResult>(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector, Func<TRight, IObservable<TRightDuration>> rightDurationSelector, Func<TLeft, TRight, TResult> resultSelector);
         IObservable<TResult> OfType<TResult>(IObservable<object> source);

+ 249 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.StandardSequenceOperators.cs

@@ -241,6 +241,112 @@ namespace System.Reactive.Linq
             return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
         }
 
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupBy<TSource, TKey>(source, keySelector, capacity, comparer);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity and selects the resulting elements by using a specified function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
+        }
+
         #endregion
 
         #region + GroupByUntil +
@@ -329,7 +435,7 @@ namespace System.Reactive.Linq
         /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
         /// </returns>
         /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
-        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector,Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
         {
             if (source == null)
                 throw new ArgumentNullException("source");
@@ -371,6 +477,148 @@ namespace System.Reactive.Linq
             return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector);
         }
 
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer and selects the resulting elements by using a specified function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encountered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and selects the resulting elements by using a specified function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TElement">The type of the elements within the groups computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="elementSelector">A function to map each source element to an element in an observable group.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="elementSelector"/> or <paramref name="durationSelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (elementSelector == null)
+                throw new ArgumentNullException("elementSelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function and comparer.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <param name="comparer">An equality comparer to compare keys with.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> or <paramref name="comparer"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+            if (comparer == null)
+                throw new ArgumentNullException("comparer");
+
+            return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity, comparer);
+        }
+
+        /// <summary>
+        /// Groups the elements of an observable sequence with the specified initial capacity according to a specified key selector function.
+        /// A duration selector function is used to control the lifetime of groups. When a group expires, it receives an OnCompleted notification. When a new element with the same
+        /// key value as a reclaimed group occurs, the group will be reborn with a new lifetime request.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TKey">The type of the grouping key computed for each element in the source sequence.</typeparam>
+        /// <typeparam name="TDuration">The type of the elements in the duration sequences obtained for each group to denote its lifetime.</typeparam>
+        /// <param name="source">An observable sequence whose elements to group.</param>
+        /// <param name="keySelector">A function to extract the key for each element.</param>
+        /// <param name="durationSelector">A function to signal the expiration of a group.</param>
+        /// <param name="capacity">The initial number of elements that the underlying dictionary can contain.</param>
+        /// <returns>
+        /// A sequence of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value.
+        /// If a group's lifetime expires, a new group with the same key value can be created once an element with such a key value is encoutered.
+        /// </returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="keySelector"/> or <paramref name="durationSelector"/> is null.</exception>
+        /// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is less than 0.</exception>
+        public static IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            if (source == null)
+                throw new ArgumentNullException("source");
+            if (keySelector == null)
+                throw new ArgumentNullException("keySelector");
+            if (durationSelector == null)
+                throw new ArgumentNullException("durationSelector");
+            if (capacity < 0)
+                throw new ArgumentOutOfRangeException("capacity");
+
+            return s_impl.GroupByUntil<TSource, TKey, TDuration>(source, keySelector, durationSelector, capacity);
+        }
+
         #endregion
 
         #region + GroupJoin +

+ 12 - 4
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupBy.cs

@@ -1,9 +1,7 @@
 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 
 #if !NO_PERF
-using System;
 using System.Collections.Generic;
-using System.Linq;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
 
@@ -14,13 +12,15 @@ namespace System.Reactive.Linq.Observαble
         private readonly IObservable<TSource> _source;
         private readonly Func<TSource, TKey> _keySelector;
         private readonly Func<TSource, TElement> _elementSelector;
+        private readonly int? _capacity;
         private readonly IEqualityComparer<TKey> _comparer;
 
-        public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+        public GroupBy(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
             _source = source;
             _keySelector = keySelector;
             _elementSelector = elementSelector;
+            _capacity = capacity;
             _comparer = comparer;
         }
 
@@ -49,7 +49,15 @@ namespace System.Reactive.Linq.Observαble
                 : base(observer, cancel)
             {
                 _parent = parent;
-                _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+
+                if (_parent._capacity.HasValue)
+                {
+                    _map = new Dictionary<TKey, ISubject<TElement>>(_parent._capacity.Value, _parent._comparer);
+                }
+                else
+                {
+                    _map = new Dictionary<TKey, ISubject<TElement>>(_parent._comparer);
+                }
             }
 
             public void OnNext(TSource value)

+ 43 - 8
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/GroupByUntil.cs

@@ -1,7 +1,6 @@
 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 
- #if !NO_PERF
-using System;
+#if !NO_PERF
 using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Disposables;
@@ -15,14 +14,16 @@ namespace System.Reactive.Linq.Observαble
         private readonly Func<TSource, TKey> _keySelector;
         private readonly Func<TSource, TElement> _elementSelector;
         private readonly Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> _durationSelector;
+        private readonly int? _capacity;
         private readonly IEqualityComparer<TKey> _comparer;
 
-        public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+        public GroupByUntil(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
             _source = source;
             _keySelector = keySelector;
             _elementSelector = elementSelector;
             _durationSelector = durationSelector;
+            _capacity = capacity;
             _comparer = comparer;
         }
 
@@ -52,7 +53,7 @@ namespace System.Reactive.Linq.Observαble
                 : base(observer, cancel)
             {
                 _parent = parent;
-                _map = new Map<TKey, ISubject<TElement>>(_parent._comparer);
+                _map = new Map<TKey, ISubject<TElement>>(_parent._capacity, _parent._comparer);
                 _nullGate = new object();
             }
 
@@ -272,11 +273,38 @@ namespace System.Reactive.Linq.Observαble
 #if !NO_CDS
     class Map<TKey, TValue>
     {
+#if !NO_CDS_COLLECTIONS
+        // Taken from Rx\NET\Source\System.Reactive.Core\Reactive\Internal\ConcurrentDictionary.cs
+
+        // The default concurrency level is DEFAULT_CONCURRENCY_MULTIPLIER * #CPUs. The higher the
+        // DEFAULT_CONCURRENCY_MULTIPLIER, the more concurrent writes can take place without interference
+        // and blocking, but also the more expensive operations that require all locks become (e.g. table
+        // resizing, ToArray, Count, etc). According to brief benchmarks that we ran, 4 seems like a good
+        // compromise.
+        private const int DEFAULT_CONCURRENCY_MULTIPLIER = 4;
+
+        private static int DefaultConcurrencyLevel
+        {
+            get { return DEFAULT_CONCURRENCY_MULTIPLIER * Environment.ProcessorCount; }
+        }
+#endif
+
         private readonly System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue> _map;
 
-        public Map(IEqualityComparer<TKey> comparer)
+        public Map(int? capacity, IEqualityComparer<TKey> comparer)
         {
-            _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+            if (capacity.HasValue)
+            {
+#if NO_CDS_COLLECTIONS
+                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(capacity.Value, comparer);
+#else
+                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(DefaultConcurrencyLevel, capacity.Value, comparer);
+#endif
+            }
+            else
+            {
+                _map = new System.Collections.Concurrent.ConcurrentDictionary<TKey, TValue>(comparer);
+            }
         }
 
         public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
@@ -327,9 +355,16 @@ namespace System.Reactive.Linq.Observαble
     {
         private readonly Dictionary<TKey, TValue> _map;
 
-        public Map(IEqualityComparer<TKey> comparer)
+        public Map(int? capacity, IEqualityComparer<TKey> comparer)
         {
-            _map = new Dictionary<TKey, TValue>(comparer);
+            if (capacity.HasValue)
+            {
+                _map = new Dictionary<TKey, TValue>(capacity.Value, comparer);
+            }
+            else
+            {
+                _map = new Dictionary<TKey, TValue>(comparer);
+            }
         }
 
         public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)

+ 57 - 15
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs

@@ -153,30 +153,50 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
         {
-            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, comparer);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
         {
-            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, EqualityComparer<TKey>.Default);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
         }
 
-        private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+        {
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, comparer);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+        {
+            return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
+        }
+
+        private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
 #if !NO_PERF
-            return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+            return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
 #else
-            return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), comparer);
+            return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), capacity, comparer);
 #endif
         }
 
@@ -186,32 +206,54 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, comparer);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
         {
-            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, EqualityComparer<TKey>.Default);
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
         {
-            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, comparer);
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, comparer);
         }
 
         public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
         {
-            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, EqualityComparer<TKey>.Default);
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default);
         }
 
-        private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+        {
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, comparer);
+        }
+
+        public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+        {
+            return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default);
+        }
+
+        private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
         {
 #if !NO_PERF
-            return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+            return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
 #else
             return new AnonymousObservable<IGroupedObservable<TKey, TElement>>(observer =>
             {
-                var map = new Dictionary<TKey, ISubject<TElement>>(comparer);
+                var map = capacity.HasValue
+                        ? new Dictionary<TKey, ISubject<TElement>>(capacity.Value, comparer)
+                        : new Dictionary<TKey, ISubject<TElement>>(comparer);
 
                 var groupDisposable = new CompositeDisposable();
                 var refCountDisposable = new RefCountDisposable(groupDisposable);
@@ -934,7 +976,7 @@ namespace System.Reactive.Linq
             return source.Select(selector).Merge();
 #endif
         }
-        
+
         private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
         {
 #if !NO_PERF

文件差異過大導致無法顯示
+ 3554 - 5
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableStandardQueryOperatorTest.cs


部分文件因文件數量過多而無法顯示