|
|
@@ -22,7 +22,10 @@ namespace System.Reactive.Linq
|
|
|
if (durationSelector == null)
|
|
|
throw new ArgumentNullException(nameof(durationSelector));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
|
|
|
@@ -36,7 +39,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
|
|
|
@@ -50,7 +56,10 @@ namespace System.Reactive.Linq
|
|
|
if (capacity < 0)
|
|
|
throw new ArgumentOutOfRangeException(nameof(capacity));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector, capacity),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
|
|
|
@@ -66,7 +75,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector, capacity, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
|
|
|
@@ -80,7 +92,10 @@ namespace System.Reactive.Linq
|
|
|
if (durationSelector == null)
|
|
|
throw new ArgumentNullException(nameof(durationSelector));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
|
|
|
@@ -96,7 +111,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
|
|
|
@@ -112,7 +130,10 @@ namespace System.Reactive.Linq
|
|
|
if (capacity < 0)
|
|
|
throw new ArgumentOutOfRangeException(nameof(capacity));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector, capacity),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
|
|
|
@@ -130,7 +151,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector, capacity, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector)
|
|
|
@@ -142,7 +166,10 @@ namespace System.Reactive.Linq
|
|
|
if (durationSelector == null)
|
|
|
throw new ArgumentNullException(nameof(durationSelector));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
|
|
|
@@ -156,7 +183,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.comparer)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity)
|
|
|
@@ -170,7 +200,10 @@ namespace System.Reactive.Linq
|
|
|
if (capacity < 0)
|
|
|
throw new ArgumentOutOfRangeException(nameof(capacity));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector, capacity),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<IGroupedAsyncObservable<TKey, TSource>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
|
|
|
@@ -186,7 +219,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TSource>>(observer => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, durationSelector, capacity, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TSource>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, durationSelector, capacity, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TSource, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.durationSelector, state.capacity, state.comparer)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector)
|
|
|
@@ -200,7 +236,10 @@ namespace System.Reactive.Linq
|
|
|
if (durationSelector == null)
|
|
|
throw new ArgumentNullException(nameof(durationSelector));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
|
|
|
@@ -216,7 +255,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.comparer)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity)
|
|
|
@@ -232,7 +274,10 @@ namespace System.Reactive.Linq
|
|
|
if (capacity < 0)
|
|
|
throw new ArgumentOutOfRangeException(nameof(capacity));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector, capacity),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity)));
|
|
|
}
|
|
|
|
|
|
public static IAsyncObservable<IGroupedAsyncObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(this IAsyncObservable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, Func<TSource, ValueTask<TElement>> elementSelector, Func<IGroupedAsyncObservable<TKey, TElement>, IAsyncObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
|
|
|
@@ -250,7 +295,10 @@ namespace System.Reactive.Linq
|
|
|
if (comparer == null)
|
|
|
throw new ArgumentNullException(nameof(comparer));
|
|
|
|
|
|
- return Create<IGroupedAsyncObservable<TKey, TElement>>(observer => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, keySelector, elementSelector, durationSelector, capacity, comparer)));
|
|
|
+ return CreateAsyncObservable<IGroupedAsyncObservable<TKey, TElement>>.From(
|
|
|
+ source,
|
|
|
+ (keySelector, elementSelector, durationSelector, capacity, comparer),
|
|
|
+ static (source, state, observer) => GroupByUntilCore<TSource, TKey, TElement, TDuration>(source, observer, (o, d) => AsyncObserver.GroupByUntil(o, d, state.keySelector, state.elementSelector, state.durationSelector, state.capacity, state.comparer)));
|
|
|
}
|
|
|
|
|
|
private static async ValueTask<IAsyncDisposable> GroupByUntilCore<TSource, TKey, TElement, TDuration>(IAsyncObservable<TSource> source, IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>> observer, Func<IAsyncObserver<IGroupedAsyncObservable<TKey, TElement>>, IAsyncDisposable, ValueTask<(IAsyncObserver<TSource>, IAsyncDisposable)>> createObserver)
|