|
@@ -38,7 +38,34 @@ namespace System.Reactive.Linq
|
|
|
if (predicate == null)
|
|
if (predicate == null)
|
|
|
throw new ArgumentNullException(nameof(predicate));
|
|
throw new ArgumentNullException(nameof(predicate));
|
|
|
|
|
|
|
|
- return All<TSource>(observer, x => Task.FromResult(predicate(x)));
|
|
|
|
|
|
|
+ return Create<TSource>(
|
|
|
|
|
+ async x =>
|
|
|
|
|
+ {
|
|
|
|
|
+ var b = default(bool);
|
|
|
|
|
+
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ b = predicate(x);
|
|
|
|
|
+ }
|
|
|
|
|
+ catch (Exception ex)
|
|
|
|
|
+ {
|
|
|
|
|
+ await observer.OnErrorAsync(ex).ConfigureAwait(false);
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (!b)
|
|
|
|
|
+ {
|
|
|
|
|
+ await observer.OnNextAsync(false).ConfigureAwait(false);
|
|
|
|
|
+ await observer.OnCompletedAsync().ConfigureAwait(false);
|
|
|
|
|
+ }
|
|
|
|
|
+ },
|
|
|
|
|
+ observer.OnErrorAsync,
|
|
|
|
|
+ async () =>
|
|
|
|
|
+ {
|
|
|
|
|
+ await observer.OnNextAsync(true).ConfigureAwait(false);
|
|
|
|
|
+ await observer.OnCompletedAsync().ConfigureAwait(false);
|
|
|
|
|
+ }
|
|
|
|
|
+ );
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
public static IAsyncObserver<TSource> All<TSource>(IAsyncObserver<bool> observer, Func<TSource, Task<bool>> predicate)
|
|
public static IAsyncObserver<TSource> All<TSource>(IAsyncObserver<bool> observer, Func<TSource, Task<bool>> predicate)
|