|
|
@@ -81,6 +81,78 @@ namespace System.Reactive.Linq
|
|
|
return StableCompositeAsyncDisposable.Create(subscription, inner);
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, IAsyncObservable<TResult>> selector)
|
|
|
+ {
|
|
|
+ if (source == null)
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
+ if (selector == null)
|
|
|
+ throw new ArgumentNullException(nameof(selector));
|
|
|
+
|
|
|
+ return Create<TResult>(async observer =>
|
|
|
+ {
|
|
|
+ var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
|
|
|
+
|
|
|
+ var subscription = await source.SubscribeAsync(sink);
|
|
|
+
|
|
|
+ return StableCompositeAsyncDisposable.Create(subscription, inner);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public static IAsyncObservable<TResult> SelectMany<TSource, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, Task<IAsyncObservable<TResult>>> selector)
|
|
|
+ {
|
|
|
+ if (source == null)
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
+ if (selector == null)
|
|
|
+ throw new ArgumentNullException(nameof(selector));
|
|
|
+
|
|
|
+ return Create<TResult>(async observer =>
|
|
|
+ {
|
|
|
+ var (sink, inner) = AsyncObserver.SelectMany(observer, selector);
|
|
|
+
|
|
|
+ var subscription = await source.SubscribeAsync(sink);
|
|
|
+
|
|
|
+ return StableCompositeAsyncDisposable.Create(subscription, inner);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
|
|
|
+ {
|
|
|
+ if (source == null)
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
+ if (collectionSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(collectionSelector));
|
|
|
+ if (resultSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(resultSelector));
|
|
|
+
|
|
|
+ return Create<TResult>(async observer =>
|
|
|
+ {
|
|
|
+ var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector);
|
|
|
+
|
|
|
+ var subscription = await source.SubscribeAsync(sink);
|
|
|
+
|
|
|
+ return StableCompositeAsyncDisposable.Create(subscription, inner);
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ public static IAsyncObservable<TResult> SelectMany<TSource, TCollection, TResult>(this IAsyncObservable<TSource> source, Func<TSource, int, Task<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, int, TCollection, int, Task<TResult>> resultSelector)
|
|
|
+ {
|
|
|
+ if (source == null)
|
|
|
+ throw new ArgumentNullException(nameof(source));
|
|
|
+ if (collectionSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(collectionSelector));
|
|
|
+ if (resultSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(resultSelector));
|
|
|
+
|
|
|
+ return Create<TResult>(async observer =>
|
|
|
+ {
|
|
|
+ var (sink, inner) = AsyncObserver.SelectMany(observer, collectionSelector, resultSelector);
|
|
|
+
|
|
|
+ var subscription = await source.SubscribeAsync(sink);
|
|
|
+
|
|
|
+ return StableCompositeAsyncDisposable.Create(subscription, inner);
|
|
|
+ });
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
partial class AsyncObserver
|
|
|
@@ -216,5 +288,56 @@ namespace System.Reactive.Linq
|
|
|
disposable
|
|
|
);
|
|
|
}
|
|
|
+
|
|
|
+ public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, IAsyncObservable<TResult>> selector)
|
|
|
+ {
|
|
|
+ if (observer == null)
|
|
|
+ throw new ArgumentNullException(nameof(observer));
|
|
|
+ if (selector == null)
|
|
|
+ throw new ArgumentNullException(nameof(selector));
|
|
|
+
|
|
|
+ return SelectMany<TSource, TResult, TResult>(observer, (x, i) => Task.FromResult(selector(x, i)), (x, i, y, j) => Task.FromResult(y));
|
|
|
+ }
|
|
|
+
|
|
|
+ public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, Task<IAsyncObservable<TResult>>> selector)
|
|
|
+ {
|
|
|
+ if (observer == null)
|
|
|
+ throw new ArgumentNullException(nameof(observer));
|
|
|
+ if (selector == null)
|
|
|
+ throw new ArgumentNullException(nameof(selector));
|
|
|
+
|
|
|
+ return SelectMany<TSource, TResult, TResult>(observer, selector, (x, i, y, j) => Task.FromResult(y));
|
|
|
+ }
|
|
|
+
|
|
|
+ public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, IAsyncObservable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
|
|
|
+ {
|
|
|
+ if (observer == null)
|
|
|
+ throw new ArgumentNullException(nameof(observer));
|
|
|
+ if (collectionSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(collectionSelector));
|
|
|
+ if (resultSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(resultSelector));
|
|
|
+
|
|
|
+ return SelectMany<TSource, TCollection, TResult>(observer, (x, i) => Task.FromResult(collectionSelector(x, i)), (x, i, y, j) => Task.FromResult(resultSelector(x, i, y, j)));
|
|
|
+ }
|
|
|
+
|
|
|
+ public static (IAsyncObserver<TSource>, IAsyncDisposable) SelectMany<TSource, TCollection, TResult>(IAsyncObserver<TResult> observer, Func<TSource, int, Task<IAsyncObservable<TCollection>>> collectionSelector, Func<TSource, int, TCollection, int, Task<TResult>> resultSelector)
|
|
|
+ {
|
|
|
+ if (observer == null)
|
|
|
+ throw new ArgumentNullException(nameof(observer));
|
|
|
+ if (collectionSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(collectionSelector));
|
|
|
+ if (resultSelector == null)
|
|
|
+ throw new ArgumentNullException(nameof(resultSelector));
|
|
|
+
|
|
|
+ Func<(TSource item, int i), Task<IAsyncObservable<(TCollection item, int i)>>> collectionSelectorWithIndex = async t => (await collectionSelector(t.item, t.i).ConfigureAwait(false)).Select((item, i) => (item, i));
|
|
|
+ Func<(TSource item, int i), (TCollection item, int i), Task<TResult>> resultSelectorWithIndex = (outer, inner) => resultSelector(outer.item, outer.i, inner.item, inner.i);
|
|
|
+
|
|
|
+ var (outerObserverWithIndex, disposable) = SelectMany(observer, collectionSelectorWithIndex, resultSelectorWithIndex);
|
|
|
+
|
|
|
+ var outerObserver = Select<TSource, (TSource item, int i)>(outerObserverWithIndex, (item, i) => (item, i));
|
|
|
+
|
|
|
+ return (outerObserver, disposable);
|
|
|
+ }
|
|
|
}
|
|
|
}
|