Prechádzať zdrojové kódy

Allocating fresh sinks for Concat and Catch.

Bart De Smet 8 rokov pred
rodič
commit
0d099748c3

+ 13 - 0
AsyncRx.NET/Playground/Program.cs

@@ -24,6 +24,7 @@ namespace Playground
         {
             await BufferTimeHoppingAsync();
             await BufferTimeSlidingAsync();
+            await ConcatAsync();
             await MergeAsync();
             await RangeAsync();
             await ReturnAsync();
@@ -54,6 +55,18 @@ namespace Playground
                     .SubscribeAsync(Print<string>()); // TODO: Use ForEachAsync.
         }
 
+        static async Task ConcatAsync()
+        {
+            await
+                AsyncObservable.Concat(
+                    AsyncObservable.Range(0, 5),
+                    AsyncObservable.Range(5, 5),
+                    AsyncObservable.Range(10, 5),
+                    AsyncObservable.Range(15, 5)
+                )
+                .SubscribeAsync(Print<int>()); // TODO: Use ForEachAsync.
+        }
+
         static async Task MergeAsync()
         {
             var subject = new SequentialSimpleAsyncSubject<IAsyncObservable<int>>();

+ 27 - 28
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Catch.cs

@@ -180,39 +180,38 @@ namespace System.Reactive.Linq
 
             var innerSubscription = new SerialAsyncDisposable();
 
-            IAsyncObserver<TSource> sink = null;
-
-            sink = Create<TSource>(
-                observer.OnNextAsync,
-                async ex =>
-                {
-                    var handler = default(IAsyncObservable<TSource>);
-
-                    try
+            IAsyncObserver<TSource> GetSink() =>
+                Create<TSource>(
+                    observer.OnNextAsync,
+                    async ex =>
                     {
-                        if (handlers.MoveNext())
+                        var handler = default(IAsyncObservable<TSource>);
+
+                        try
                         {
-                            handler = handlers.Current;
+                            if (handlers.MoveNext())
+                            {
+                                handler = handlers.Current;
+                            }
+                        }
+                        catch (Exception err)
+                        {
+                            await observer.OnErrorAsync(err).ConfigureAwait(false);
+                            return;
                         }
-                    }
-                    catch (Exception err)
-                    {
-                        await observer.OnErrorAsync(err).ConfigureAwait(false);
-                        return;
-                    }
 
-                    if (handler == null)
-                    {
-                        await observer.OnErrorAsync(ex).ConfigureAwait(false); // REVIEW: Is Throw behavior right here?
-                        return;
-                    }
+                        if (handler == null)
+                        {
+                            await observer.OnErrorAsync(ex).ConfigureAwait(false); // REVIEW: Is Throw behavior right here?
+                            return;
+                        }
 
-                    var handlerSubscription = await handler.SubscribeSafeAsync(sink).ConfigureAwait(false);
+                        var handlerSubscription = await handler.SubscribeSafeAsync(GetSink()).ConfigureAwait(false);
 
-                    await innerSubscription.AssignAsync(handlerSubscription).ConfigureAwait(false);
-                },
-                observer.OnCompletedAsync
-            );
+                        await innerSubscription.AssignAsync(handlerSubscription).ConfigureAwait(false);
+                    },
+                    observer.OnCompletedAsync
+                );
 
             var disposeEnumerator = AsyncDisposable.Create(() =>
             {
@@ -222,7 +221,7 @@ namespace System.Reactive.Linq
 
             var subscription = StableCompositeAsyncDisposable.Create(innerSubscription, disposeEnumerator);
 
-            return (sink, subscription);
+            return (GetSink(), subscription);
         }
     }
 }

+ 27 - 28
AsyncRx.NET/System.Reactive.Async/System/Reactive/Linq/Operators/Concat.cs

@@ -90,39 +90,38 @@ namespace System.Reactive.Linq
 
             var innerSubscription = new SerialAsyncDisposable();
 
-            IAsyncObserver<TSource> sink = null;
-
-            sink = Create<TSource>(
-                observer.OnNextAsync,
-                observer.OnErrorAsync,
-                async () =>
-                {
-                    var next = default(IAsyncObservable<TSource>);
-
-                    try
+            IAsyncObserver<TSource> GetSink() =>
+                Create<TSource>(
+                    observer.OnNextAsync,
+                    observer.OnErrorAsync,
+                    async () =>
                     {
-                        if (handlers.MoveNext())
+                        var next = default(IAsyncObservable<TSource>);
+
+                        try
                         {
-                            next = handlers.Current;
+                            if (handlers.MoveNext())
+                            {
+                                next = handlers.Current;
+                            }
+                        }
+                        catch (Exception err)
+                        {
+                            await observer.OnErrorAsync(err).ConfigureAwait(false);
+                            return;
                         }
-                    }
-                    catch (Exception err)
-                    {
-                        await observer.OnErrorAsync(err).ConfigureAwait(false);
-                        return;
-                    }
 
-                    if (next == null)
-                    {
-                        await observer.OnCompletedAsync().ConfigureAwait(false); // REVIEW: Is Empty behavior right here?
-                        return;
-                    }
+                        if (next == null)
+                        {
+                            await observer.OnCompletedAsync().ConfigureAwait(false); // REVIEW: Is Empty behavior right here?
+                            return;
+                        }
 
-                    var nextSubscription = await next.SubscribeSafeAsync(sink).ConfigureAwait(false);
+                        var nextSubscription = await next.SubscribeSafeAsync(GetSink()).ConfigureAwait(false);
 
-                    await innerSubscription.AssignAsync(nextSubscription).ConfigureAwait(false);
-                }
-            );
+                        await innerSubscription.AssignAsync(nextSubscription).ConfigureAwait(false);
+                    }
+                );
 
             var disposeEnumerator = AsyncDisposable.Create(() =>
             {
@@ -132,7 +131,7 @@ namespace System.Reactive.Linq
 
             var subscription = StableCompositeAsyncDisposable.Create(innerSubscription, disposeEnumerator);
 
-            return (sink, subscription);
+            return (GetSink(), subscription);
         }
     }
 }