Jelajahi Sumber

Moving some functionality to System.Reactive.Async.Core.

Bart De Smet 8 tahun lalu
induk
melakukan
c91384933b

+ 1 - 0
AsyncRx.NET/System.Reactive.Async.Core/System.Reactive.Async.Core.csproj

@@ -5,6 +5,7 @@
   </PropertyGroup>
 
   <ItemGroup>
+    <ProjectReference Include="..\System.Reactive.Async.Disposables\System.Reactive.Async.Disposables.csproj" />
     <ProjectReference Include="..\System.Reactive.Async.Interfaces\System.Reactive.Async.Interfaces.csproj" />
     <ProjectReference Include="..\System.Reactive.Bcl\System.Reactive.Bcl.csproj" />
   </ItemGroup>

+ 2 - 2
AsyncRx.NET/System.Reactive.Async.Linq/System/AsyncObservableExtensions.cs → AsyncRx.NET/System.Reactive.Async.Core/System/AsyncObservableExtensions.cs

@@ -2,7 +2,7 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
-using System.Reactive.Linq;
+using System.Reactive;
 using System.Threading.Tasks;
 
 namespace System
@@ -20,7 +20,7 @@ namespace System
             if (onCompletedAsync == null)
                 throw new ArgumentNullException(nameof(onCompletedAsync));
 
-            return source.SubscribeAsync(AsyncObserver.Create(onNextAsync, onErrorAsync, onCompletedAsync));
+            return source.SubscribeAsync(new AsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync));
         }
     }
 }

+ 29 - 0
AsyncRx.NET/System.Reactive.Async.Core/System/Reactive/AsyncObservable.cs

@@ -0,0 +1,29 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive
+{
+    public class AsyncObservable<T> : AsyncObservableBase<T>
+    {
+        private readonly Func<IAsyncObserver<T>, Task<IAsyncDisposable>> _subscribeAsync;
+
+        public AsyncObservable(Func<IAsyncObserver<T>, Task<IAsyncDisposable>> subscribeAsync)
+        {
+            if (subscribeAsync == null)
+                throw new ArgumentNullException(nameof(subscribeAsync));
+
+            _subscribeAsync = subscribeAsync;
+        }
+
+        protected override Task<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<T> observer)
+        {
+            if (observer == null)
+                throw new ArgumentNullException(nameof(observer));
+
+            return _subscribeAsync(observer);
+        }
+    }
+}

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/AsyncObservableBase.cs → AsyncRx.NET/System.Reactive.Async.Core/System/Reactive/AsyncObservableBase.cs

@@ -4,7 +4,7 @@
 
 using System.Threading.Tasks;
 
-namespace System.Reactive.Linq
+namespace System.Reactive
 {
     public abstract class AsyncObservableBase<T> : IAsyncObservable<T>
     {

+ 35 - 0
AsyncRx.NET/System.Reactive.Async.Core/System/Reactive/AsyncObserver.cs

@@ -0,0 +1,35 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Threading.Tasks;
+
+namespace System.Reactive
+{
+    public class AsyncObserver<T> : AsyncObserverBase<T>
+    {
+        private readonly Func<T, Task> _onNextAsync;
+        private readonly Func<Exception, Task> _onErrorAsync;
+        private readonly Func<Task> _onCompletedAsync;
+
+        public AsyncObserver(Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
+        {
+            if (onNextAsync == null)
+                throw new ArgumentNullException(nameof(onNextAsync));
+            if (onErrorAsync == null)
+                throw new ArgumentNullException(nameof(onErrorAsync));
+            if (onCompletedAsync == null)
+                throw new ArgumentNullException(nameof(onCompletedAsync));
+
+            _onNextAsync = onNextAsync;
+            _onErrorAsync = onErrorAsync;
+            _onCompletedAsync = onCompletedAsync;
+        }
+
+        protected override Task OnCompletedAsyncCore() => _onCompletedAsync();
+
+        protected override Task OnErrorAsyncCore(Exception error) => _onErrorAsync(error ?? throw new ArgumentNullException(nameof(error)));
+
+        protected override Task OnNextAsyncCore(T value) => _onNextAsync(value);
+    }
+}

+ 1 - 1
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/AsyncObserverBase.cs → AsyncRx.NET/System.Reactive.Async.Core/System/Reactive/AsyncObserverBase.cs

@@ -5,7 +5,7 @@
 using System.Threading;
 using System.Threading.Tasks;
 
-namespace System.Reactive.Linq
+namespace System.Reactive
 {
     public abstract class AsyncObserverBase<T> : IAsyncObserver<T>
     {

+ 1 - 13
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/AsyncObservable.cs

@@ -14,7 +14,7 @@ namespace System.Reactive.Linq
             if (subscribeAsync == null)
                 throw new ArgumentNullException(nameof(subscribeAsync));
 
-            return new AnonymousAsyncObservable<T>(subscribeAsync);
+            return new AsyncObservable<T>(subscribeAsync);
         }
 
         public static Task<IAsyncDisposable> SubscribeSafeAsync<T>(this IAsyncObservable<T> source, IAsyncObserver<T> observer)
@@ -40,17 +40,5 @@ namespace System.Reactive.Linq
                 }
             }
         }
-
-        private sealed class AnonymousAsyncObservable<T> : AsyncObservableBase<T>
-        {
-            private readonly Func<IAsyncObserver<T>, Task<IAsyncDisposable>> _subscribeAsync;
-
-            public AnonymousAsyncObservable(Func<IAsyncObserver<T>, Task<IAsyncDisposable>> subscribeAsync)
-            {
-                _subscribeAsync = subscribeAsync;
-            }
-
-            protected override Task<IAsyncDisposable> SubscribeAsyncCore(IAsyncObserver<T> observer) => _subscribeAsync(observer);
-        }
     }
 }

+ 3 - 28
AsyncRx.NET/System.Reactive.Async.Linq/System/Reactive/Linq/AsyncObserver.cs

@@ -2,7 +2,6 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
-using System.Runtime.ExceptionServices;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -14,13 +13,9 @@ namespace System.Reactive.Linq
             if (onNextAsync == null)
                 throw new ArgumentNullException(nameof(onNextAsync));
 
-            return new AnonymousAsyncObserver<T>(
+            return new AsyncObserver<T>(
                 onNextAsync,
-                ex =>
-                {
-                    ExceptionDispatchInfo.Capture(ex).Throw();
-                    return Task.CompletedTask;
-                },
+                ex => Task.FromException(ex),
                 () => Task.CompletedTask
             );
         }
@@ -34,27 +29,7 @@ namespace System.Reactive.Linq
             if (onCompletedAsync == null)
                 throw new ArgumentNullException(nameof(onCompletedAsync));
 
-            return new AnonymousAsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync);
-        }
-
-        private sealed class AnonymousAsyncObserver<T> : AsyncObserverBase<T>
-        {
-            private readonly Func<T, Task> _onNextAsync;
-            private readonly Func<Exception, Task> _onErrorAsync;
-            private readonly Func<Task> _onCompletedAsync;
-
-            public AnonymousAsyncObserver(Func<T, Task> onNextAsync, Func<Exception, Task> onErrorAsync, Func<Task> onCompletedAsync)
-            {
-                _onNextAsync = onNextAsync;
-                _onErrorAsync = onErrorAsync;
-                _onCompletedAsync = onCompletedAsync;
-            }
-
-            protected override Task OnCompletedAsyncCore() => _onCompletedAsync();
-
-            protected override Task OnErrorAsyncCore(Exception error) => _onErrorAsync(error);
-
-            protected override Task OnNextAsyncCore(T value) => _onNextAsync(value);
+            return new AsyncObserver<T>(onNextAsync, onErrorAsync, onCompletedAsync);
         }
     }
 }