|
|
@@ -3,6 +3,7 @@
|
|
|
// See the LICENSE file in the project root for more information.
|
|
|
|
|
|
using System.Reactive.Disposables;
|
|
|
+using System.Threading;
|
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
namespace System.Reactive.Linq
|
|
|
@@ -17,6 +18,8 @@ namespace System.Reactive.Linq
|
|
|
return Defer(() => Task.FromResult(observableFactory()));
|
|
|
}
|
|
|
|
|
|
+ public static IAsyncObservable<TSource> DeferAsync<TSource>(Func<Task<IAsyncObservable<TSource>>> observableFactory) => Defer(observableFactory);
|
|
|
+
|
|
|
public static IAsyncObservable<TSource> Defer<TSource>(Func<Task<IAsyncObservable<TSource>>> observableFactory)
|
|
|
{
|
|
|
if (observableFactory == null)
|
|
|
@@ -39,5 +42,15 @@ namespace System.Reactive.Linq
|
|
|
return await source.SubscribeSafeAsync(observer).ConfigureAwait(false);
|
|
|
});
|
|
|
}
|
|
|
+
|
|
|
+ public static IAsyncObservable<TSource> DeferAsync<TSource>(Func<CancellationToken, Task<IAsyncObservable<TSource>>> observableFactory) => DeferAsync(observableFactory);
|
|
|
+
|
|
|
+ public static IAsyncObservable<TSource> Defer<TSource>(Func<CancellationToken, Task<IAsyncObservable<TSource>>> observableFactory)
|
|
|
+ {
|
|
|
+ if (observableFactory == null)
|
|
|
+ throw new ArgumentNullException(nameof(observableFactory));
|
|
|
+
|
|
|
+ return StartAsync(observableFactory).Merge();
|
|
|
+ }
|
|
|
}
|
|
|
}
|