Browse Source

Inline AnonymousObserver use at places (#592)

David Karnok 7 years ago
parent
commit
4fe9ad296f

+ 63 - 0
Rx.NET/Source/src/System.Reactive/Internal/ObserverWithToken.cs

@@ -0,0 +1,63 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Reactive.Disposables;
+using System.Text;
+
+namespace System.Reactive
+{
+    /// <summary>
+    /// Wraps another IObserver, relays signals to it
+    /// and hosts an external IDisposable
+    /// to be disposed upon termination.
+    /// </summary>
+    /// <typeparam name="T">The element type of the sequence.</typeparam>
+    internal sealed class ObserverWithToken<T> : IObserver<T>
+    {
+        readonly IObserver<T> _downstream;
+
+        IDisposable _tokenDisposable;
+
+        public ObserverWithToken(IObserver<T> downstream)
+        {
+            this._downstream = downstream;
+        }
+
+        internal void SetTokenDisposable(IDisposable d)
+        {
+            Disposable.SetSingle(ref _tokenDisposable, d);
+        }
+
+        public void OnCompleted()
+        {
+            try
+            {
+                _downstream.OnCompleted();
+            }
+            finally
+            {
+                Disposable.TryDispose(ref _tokenDisposable);    
+            }
+        }
+
+        public void OnError(Exception error)
+        {
+            try
+            {
+                _downstream.OnError(error);
+            }
+            finally
+            {
+                Disposable.TryDispose(ref _tokenDisposable);
+            }
+        }
+
+        public void OnNext(T value)
+        {
+            _downstream.OnNext(value);
+        }
+    }
+}

+ 8 - 2
Rx.NET/Source/src/System.Reactive/Notification.cs

@@ -336,10 +336,16 @@ namespace System.Reactive
 #endif
         internal sealed class OnCompletedNotification : Notification<T>
         {
+            /// <summary>
+            /// Complete notifications are stateless thus only one instance
+            /// can ever exist per type <see cref="T"/>.
+            /// </summary>
+            internal static readonly Notification<T> Instance = new OnCompletedNotification();
+
             /// <summary>
             /// Constructs a notification of the end of a sequence.
             /// </summary>
-            public OnCompletedNotification()
+            private OnCompletedNotification()
             {
             }
 
@@ -628,7 +634,7 @@ namespace System.Reactive
         /// <returns>The OnCompleted notification.</returns>
         public static Notification<T> CreateOnCompleted<T>()
         {
-            return new Notification<T>.OnCompletedNotification();
+            return Notification<T>.OnCompletedNotification.Instance;
         }
     }
 }

+ 4 - 20
Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs

@@ -259,30 +259,14 @@ namespace System
             {
                 if (!token.IsCancellationRequested)
                 {
-                    var r = new SingleAssignmentDisposable();
+                    var consumer = new ObserverWithToken<T>(observer);
 
                     //
                     // [OK] Use of unsafe Subscribe: exception during Subscribe doesn't orphan CancellationTokenRegistration.
                     //
-                    var d = source.Subscribe/*Unsafe*/(
-                        observer.OnNext,
-                        ex =>
-                        {
-                            using (r)
-                            {
-                                observer.OnError(ex);
-                            }
-                        },
-                        () =>
-                        {
-                            using (r)
-                            {
-                                observer.OnCompleted();
-                            }
-                        }
-                    );
-
-                    r.Disposable = token.Register(d.Dispose);
+                    var d = source.Subscribe/*Unsafe*/(consumer);
+
+                    consumer.SetTokenDisposable(token.Register(state => ((IDisposable)state).Dispose(), d));
                 }
             }
             else

+ 5 - 27
Rx.NET/Source/src/System.Reactive/Platforms/Desktop/Linq/QueryLanguage.Remoting.cs

@@ -85,36 +85,14 @@ namespace System.Reactive.Linq
 
             public IDisposable Subscribe(IObserver<T> observer)
             {
-                var d = new SingleAssignmentDisposable();
-
-                var o = Observer.Create<T>(
-                    observer.OnNext,
-                    ex =>
-                    {
-                        //
-                        // Make call to the remote subscription, causing lease renewal to be stopped.
-                        //
-                        using (d)
-                        {
-                            observer.OnError(ex);
-                        }
-                    },
-                    () =>
-                    {
-                        //
-                        // Make call to the remote subscription, causing lease renewal to be stopped.
-                        //
-                        using (d)
-                        {
-                            observer.OnCompleted();
-                        }
-                    }
-                );
+                var consumer = new ObserverWithToken<T>(observer);
 
                 //
                 // [OK] Use of unsafe Subscribe: non-pretentious transparent wrapping through remoting; exception coming from the remote object is not re-routed.
                 //
-                d.Disposable = remotableObservable.Subscribe/*Unsafe*/(new RemotableObserver<T>(o));
+                var d = remotableObservable.Subscribe/*Unsafe*/(new RemotableObserver<T>(consumer));
+
+                consumer.SetTokenDisposable(d);
 
                 return d;
             }
@@ -254,4 +232,4 @@ namespace System.Reactive.Linq
         #endregion
     }
 }
-#endif
+#endif