Browse Source

Adding Publish overloads with initial value.

Bart De Smet 8 years ago
parent
commit
3f8103bb52

+ 29 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Publish.cs

@@ -7,7 +7,7 @@ using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
 {
-    // REVIEW: Expose Publish using ConcurrentSimpleAsyncSubject<T> underneath.
+    // REVIEW: Expose Publish using ConcurrentSimpleAsyncSubject<T> or ConcurrentBehaviorAsyncSubject<T> underneath.
 
     partial class AsyncObservable
     {
@@ -19,6 +19,14 @@ namespace System.Reactive.Linq
             return Multicast(source, new SequentialSimpleAsyncSubject<TSource>());
         }
 
+        public static IConnectableAsyncObservable<TSource> Publish<TSource>(this IAsyncObservable<TSource> source, TSource value)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return Multicast(source, new SequentialBehaviorAsyncSubject<TSource>(value));
+        }
+
         public static IAsyncObservable<TResult> Publish<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector)
         {
             if (source == null)
@@ -29,6 +37,16 @@ namespace System.Reactive.Linq
             return Multicast(source, () => new SequentialSimpleAsyncSubject<TSource>(), selector);
         }
 
+        public static IAsyncObservable<TResult> Publish<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, IAsyncObservable<TResult>> selector, TSource value)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Multicast(source, () => new SequentialBehaviorAsyncSubject<TSource>(value), selector);
+        }
+
         public static IAsyncObservable<TResult> Publish<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, Task<IAsyncObservable<TResult>>> selector)
         {
             if (source == null)
@@ -38,5 +56,15 @@ namespace System.Reactive.Linq
 
             return Multicast(source, () => Task.FromResult<IAsyncSubject<TSource, TSource>>(new SequentialSimpleAsyncSubject<TSource>()), selector);
         }
+
+        public static IAsyncObservable<TResult> Publish<TSource, TResult>(this IAsyncObservable<TSource> source, Func<IAsyncObservable<TSource>, Task<IAsyncObservable<TResult>>> selector, TSource value)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+            return Multicast(source, () => Task.FromResult<IAsyncSubject<TSource, TSource>>(new SequentialBehaviorAsyncSubject<TSource>(value)), selector);
+        }
     }
 }