Преглед изворни кода

Some improvements to Append.

Bart De Smet пре 8 година
родитељ
комит
db9f608b1f

+ 69 - 7
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/Operators/Append.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq
 {
@@ -24,7 +25,20 @@ namespace System.Reactive.Linq
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return Create<TSource>(observer => source.SubscribeSafeAsync(AsyncObserver.Append(observer, value, scheduler)));
+            return Create<TSource>(async observer =>
+            {
+                var d = new CompositeAsyncDisposable();
+
+                var (sink, disposable) = AsyncObserver.Append(observer, value, scheduler);
+
+                await d.AddAsync(disposable).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
+
+                await d.AddAsync(subscription).ConfigureAwait(false);
+
+                return d;
+            });
         }
 
         public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, params TSource[] values)
@@ -46,7 +60,20 @@ namespace System.Reactive.Linq
             if (values == null)
                 throw new ArgumentNullException(nameof(values));
 
-            return Create<TSource>(async observer => await source.SubscribeSafeAsync(AsyncObserver.Append(observer, scheduler, values)));
+            return Create<TSource>(async observer =>
+            {
+                var d = new CompositeAsyncDisposable();
+
+                var (sink, disposable) = AsyncObserver.Append(observer, scheduler, values);
+
+                await d.AddAsync(disposable).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
+
+                await d.AddAsync(subscription).ConfigureAwait(false);
+
+                return d;
+            });
         }
 
         public static IAsyncObservable<TSource> Append<TSource>(this IAsyncObservable<TSource> source, IEnumerable<TSource> values)
@@ -68,7 +95,20 @@ namespace System.Reactive.Linq
             if (values == null)
                 throw new ArgumentNullException(nameof(values));
 
-            return Create<TSource>(async observer => await source.SubscribeSafeAsync(AsyncObserver.Append(observer, scheduler, values)));
+            return Create<TSource>(async observer =>
+            {
+                var d = new CompositeAsyncDisposable();
+
+                var (sink, disposable) = AsyncObserver.Append(observer, scheduler, values);
+
+                await d.AddAsync(disposable).ConfigureAwait(false);
+
+                var subscription = await source.SubscribeSafeAsync(sink).ConfigureAwait(false);
+
+                await d.AddAsync(subscription).ConfigureAwait(false);
+
+                return d;
+            });
         }
     }
 
@@ -90,14 +130,36 @@ namespace System.Reactive.Linq
             );
         }
 
-        public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, TSource value, IAsyncScheduler scheduler)
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, TSource value, IAsyncScheduler scheduler)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            throw new NotImplementedException();
+            var d = new SingleAssignmentAsyncDisposable();
+
+            return
+                (
+                    Create<TSource>(
+                        observer.OnNextAsync,
+                        observer.OnErrorAsync,
+                        async () =>
+                        {
+                            var task = await scheduler.ScheduleAsync(async ct =>
+                            {
+                                if (!ct.IsCancellationRequested)
+                                {
+                                    await observer.OnNextAsync(value);
+                                    await observer.OnCompletedAsync();
+                                }
+                            }).ConfigureAwait(false);
+
+                            await d.AssignAsync(task).ConfigureAwait(false);
+                        }
+                    ),
+                    d
+                );
         }
 
         public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, params TSource[] values)
@@ -122,7 +184,7 @@ namespace System.Reactive.Linq
             );
         }
 
-        public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, params TSource[] values)
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, params TSource[] values)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));
@@ -156,7 +218,7 @@ namespace System.Reactive.Linq
             );
         }
 
-        public static IAsyncObserver<TSource> Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, IEnumerable<TSource> values)
+        public static (IAsyncObserver<TSource>, IAsyncDisposable) Append<TSource>(IAsyncObserver<TSource> observer, IAsyncScheduler scheduler, IEnumerable<TSource> values)
         {
             if (observer == null)
                 throw new ArgumentNullException(nameof(observer));