Sfoglia il codice sorgente

Capture the concept of a "safe observer" (#597)

Capture the concept of a "safe observer" since it has been used and copied multiple times in the repo. ISafeObserver implements IDisposable and IObserver and can hold onto a resource. Implementors must at least dispose the resource on disposal of the SafeObserver, inheriting classes may choose to dispose the resource on other occasions as well. So far, AnonymousSafeObserver, SafeObserver and ObserverWithToken work like that are have been changed to implement ISafeObserver.
Daniel C. Weber 7 anni fa
parent
commit
da201773e9

+ 1 - 1
Rx.NET/Source/src/System.Reactive/AnonymousObserver.cs

@@ -84,6 +84,6 @@ namespace System.Reactive
         /// </summary>
         /// </summary>
         protected override void OnCompletedCore() => _onCompleted();
         protected override void OnCompletedCore() => _onCompleted();
 
 
-        internal IObserver<T> MakeSafe(IDisposable disposable) => new AnonymousSafeObserver<T>(_onNext, _onError, _onCompleted, disposable);
+        internal ISafeObserver<T> MakeSafe() => new AnonymousSafeObserver<T>(_onNext, _onError, _onCompleted);
     }
     }
 }
 }

+ 17 - 15
Rx.NET/Source/src/System.Reactive/AnonymousSafeObserver.cs

@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 // See the LICENSE file in the project root for more information. 
 
 
+using System.Reactive.Disposables;
 using System.Threading;
 using System.Threading;
 
 
 namespace System.Reactive
 namespace System.Reactive
@@ -18,21 +19,20 @@ namespace System.Reactive
     /// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
     /// that accept delegates for the On* handlers. By doing the fusion, we make the call stack depth shorter which
     /// helps debugging and some performance.
     /// helps debugging and some performance.
     /// </summary>
     /// </summary>
-    internal sealed class AnonymousSafeObserver<T> : IObserver<T>
+    internal sealed class AnonymousSafeObserver<T> : ISafeObserver<T>
     {
     {
         private readonly Action<T> _onNext;
         private readonly Action<T> _onNext;
         private readonly Action<Exception> _onError;
         private readonly Action<Exception> _onError;
         private readonly Action _onCompleted;
         private readonly Action _onCompleted;
-        private readonly IDisposable _disposable;
+        private IDisposable _disposable;
 
 
         private int isStopped;
         private int isStopped;
 
 
-        public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted, IDisposable disposable)
+        public AnonymousSafeObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
         {
         {
             _onNext = onNext;
             _onNext = onNext;
             _onError = onError;
             _onError = onError;
             _onCompleted = onCompleted;
             _onCompleted = onCompleted;
-            _disposable = disposable;
         }
         }
 
 
         public void OnNext(T value)
         public void OnNext(T value)
@@ -48,7 +48,7 @@ namespace System.Reactive
                 finally
                 finally
                 {
                 {
                     if (!__noError)
                     if (!__noError)
-                        _disposable.Dispose();
+                        Dispose();
                 }
                 }
             }
             }
         }
         }
@@ -57,14 +57,10 @@ namespace System.Reactive
         {
         {
             if (Interlocked.Exchange(ref isStopped, 1) == 0)
             if (Interlocked.Exchange(ref isStopped, 1) == 0)
             {
             {
-                try
+                using (this)
                 {
                 {
                     _onError(error);
                     _onError(error);
                 }
                 }
-                finally
-                {
-                    _disposable.Dispose();
-                }
             }
             }
         }
         }
 
 
@@ -72,15 +68,21 @@ namespace System.Reactive
         {
         {
             if (Interlocked.Exchange(ref isStopped, 1) == 0)
             if (Interlocked.Exchange(ref isStopped, 1) == 0)
             {
             {
-                try
+                using (this)
                 {
                 {
                     _onCompleted();
                     _onCompleted();
                 }
                 }
-                finally
-                {
-                    _disposable.Dispose();
-                }
             }
             }
         }
         }
+
+        public void SetResource(IDisposable resource)
+        {
+            Disposable.SetSingle(ref _disposable, resource);
+        }
+
+        public void Dispose()
+        {
+            Disposable.TryDispose(ref _disposable);
+        }
     }
     }
 }
 }

+ 16 - 0
Rx.NET/Source/src/System.Reactive/Internal/ISafeObserver.cs

@@ -0,0 +1,16 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+namespace System.Reactive
+{
+    /// <summary>
+    /// Base interface for observers that can dispose of a resource on a terminal notification
+    /// or when disposed itself.
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    internal interface ISafeObserver<in T> : IObserver<T>, IDisposable
+    {
+        void SetResource(IDisposable resource);
+    }
+}

+ 14 - 17
Rx.NET/Source/src/System.Reactive/Internal/ObserverWithToken.cs

@@ -15,7 +15,7 @@ namespace System.Reactive
     /// to be disposed upon termination.
     /// to be disposed upon termination.
     /// </summary>
     /// </summary>
     /// <typeparam name="T">The element type of the sequence.</typeparam>
     /// <typeparam name="T">The element type of the sequence.</typeparam>
-    internal sealed class ObserverWithToken<T> : IObserver<T>
+    internal sealed class ObserverWithToken<T> : ISafeObserver<T>
     {
     {
         readonly IObserver<T> _downstream;
         readonly IObserver<T> _downstream;
 
 
@@ -23,41 +23,38 @@ namespace System.Reactive
 
 
         public ObserverWithToken(IObserver<T> downstream)
         public ObserverWithToken(IObserver<T> downstream)
         {
         {
-            this._downstream = downstream;
-        }
-
-        internal void SetTokenDisposable(IDisposable d)
-        {
-            Disposable.SetSingle(ref _tokenDisposable, d);
+            _downstream = downstream;
         }
         }
 
 
         public void OnCompleted()
         public void OnCompleted()
         {
         {
-            try
+            using (this)
             {
             {
                 _downstream.OnCompleted();
                 _downstream.OnCompleted();
             }
             }
-            finally
-            {
-                Disposable.TryDispose(ref _tokenDisposable);    
-            }
         }
         }
 
 
         public void OnError(Exception error)
         public void OnError(Exception error)
         {
         {
-            try
+            using (this)
             {
             {
                 _downstream.OnError(error);
                 _downstream.OnError(error);
             }
             }
-            finally
-            {
-                Disposable.TryDispose(ref _tokenDisposable);
-            }
         }
         }
 
 
         public void OnNext(T value)
         public void OnNext(T value)
         {
         {
             _downstream.OnNext(value);
             _downstream.OnNext(value);
         }
         }
+
+        public void SetResource(IDisposable resource)
+        {
+            Disposable.SetSingle(ref _tokenDisposable, resource);
+        }
+
+        public void Dispose()
+        {
+            Disposable.TryDispose(ref _tokenDisposable);
+        }
     }
     }
 }
 }

+ 7 - 7
Rx.NET/Source/src/System.Reactive/Internal/Producer.cs

@@ -46,7 +46,9 @@ namespace System.Reactive
             //
             //
             if (enableSafeguard)
             if (enableSafeguard)
             {
             {
-                observer = SafeObserver<TSource>.Create(observer, subscription);
+                var safeObserver = SafeObserver<TSource>.Create(observer);
+                safeObserver.SetResource(subscription);
+                observer = safeObserver;
             }
             }
 
 
             if (CurrentThreadScheduler.IsScheduleRequired)
             if (CurrentThreadScheduler.IsScheduleRequired)
@@ -90,7 +92,7 @@ namespace System.Reactive
 
 
         public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard)
         public IDisposable SubscribeRaw(IObserver<TTarget> observer, bool enableSafeguard)
         {
         {
-            SingleAssignmentDisposable subscription = null;
+            ISafeObserver<TTarget> safeObserver = null;
 
 
             //
             //
             // See AutoDetachObserver.cs for more information on the safeguarding requirement and
             // See AutoDetachObserver.cs for more information on the safeguarding requirement and
@@ -98,14 +100,12 @@ namespace System.Reactive
             //
             //
             if (enableSafeguard)
             if (enableSafeguard)
             {
             {
-                subscription = new SingleAssignmentDisposable();
-                observer = SafeObserver<TTarget>.Create(observer, subscription);
+                observer = safeObserver = SafeObserver<TTarget>.Create(observer);
             }
             }
 
 
             var sink = CreateSink(observer);
             var sink = CreateSink(observer);
 
 
-            if (subscription != null)
-                subscription.Disposable = sink;
+            safeObserver?.SetResource(sink);
 
 
             if (CurrentThreadScheduler.IsScheduleRequired)
             if (CurrentThreadScheduler.IsScheduleRequired)
             {
             {
@@ -118,7 +118,7 @@ namespace System.Reactive
                 Run(sink);
                 Run(sink);
             }
             }
 
 
-            return (IDisposable)subscription ?? sink;
+            return sink;
         }
         }
 
 
         /// <summary>
         /// <summary>

+ 22 - 18
Rx.NET/Source/src/System.Reactive/Internal/SafeObserver.cs

@@ -2,6 +2,8 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 // See the LICENSE file in the project root for more information. 
 
 
+using System.Reactive.Disposables;
+
 namespace System.Reactive
 namespace System.Reactive
 {
 {
     //
     //
@@ -9,27 +11,27 @@ namespace System.Reactive
     // its implementation aspects.
     // its implementation aspects.
     //
     //
 
 
-    internal sealed class SafeObserver<TSource> : IObserver<TSource>
+    internal sealed class SafeObserver<TSource> : ISafeObserver<TSource>
     {
     {
         private readonly IObserver<TSource> _observer;
         private readonly IObserver<TSource> _observer;
-        private readonly IDisposable _disposable;
 
 
-        public static IObserver<TSource> Create(IObserver<TSource> observer, IDisposable disposable)
+        private IDisposable _disposable;
+
+        public static ISafeObserver<TSource> Create(IObserver<TSource> observer)
         {
         {
             if (observer is AnonymousObserver<TSource> a)
             if (observer is AnonymousObserver<TSource> a)
             {
             {
-                return a.MakeSafe(disposable);
+                return a.MakeSafe();
             }
             }
             else
             else
             {
             {
-                return new SafeObserver<TSource>(observer, disposable);
+                return new SafeObserver<TSource>(observer);
             }
             }
         }
         }
 
 
-        private SafeObserver(IObserver<TSource> observer, IDisposable disposable)
+        private SafeObserver(IObserver<TSource> observer)
         {
         {
             _observer = observer;
             _observer = observer;
-            _disposable = disposable;
         }
         }
 
 
         public void OnNext(TSource value)
         public void OnNext(TSource value)
@@ -44,33 +46,35 @@ namespace System.Reactive
             {
             {
                 if (!__noError)
                 if (!__noError)
                 {
                 {
-                    _disposable.Dispose();
+                    Dispose();
                 }
                 }
             }
             }
         }
         }
 
 
         public void OnError(Exception error)
         public void OnError(Exception error)
         {
         {
-            try
+            using (this)
             {
             {
                 _observer.OnError(error);
                 _observer.OnError(error);
             }
             }
-            finally
-            {
-                _disposable.Dispose();
-            }
         }
         }
 
 
         public void OnCompleted()
         public void OnCompleted()
         {
         {
-            try
+            using (this)
             {
             {
                 _observer.OnCompleted();
                 _observer.OnCompleted();
             }
             }
-            finally
-            {
-                _disposable.Dispose();
-            }
+        }
+
+        public void SetResource(IDisposable resource)
+        {
+            Disposable.SetSingle(ref _disposable, resource);
+        }
+
+        public void Dispose()
+        {
+            Disposable.TryDispose(ref _disposable);
         }
         }
     }
     }
 }
 }

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Observable.Extensions.cs

@@ -266,7 +266,7 @@ namespace System
                     //
                     //
                     var d = source.Subscribe/*Unsafe*/(consumer);
                     var d = source.Subscribe/*Unsafe*/(consumer);
 
 
-                    consumer.SetTokenDisposable(token.Register(state => ((IDisposable)state).Dispose(), d));
+                    consumer.SetResource(token.Register(state => ((IDisposable)state).Dispose(), d));
                 }
                 }
             }
             }
             else
             else

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Platforms/Desktop/Linq/QueryLanguage.Remoting.cs

@@ -92,7 +92,7 @@ namespace System.Reactive.Linq
                 //
                 //
                 var d = remotableObservable.Subscribe/*Unsafe*/(new RemotableObserver<T>(consumer));
                 var d = remotableObservable.Subscribe/*Unsafe*/(new RemotableObserver<T>(consumer));
 
 
-                consumer.SetTokenDisposable(d);
+                consumer.SetResource(d);
 
 
                 return d;
                 return d;
             }
             }