Răsfoiți Sursa

Simplified BaseBlocking, FirstBlocking, and LastBlocking
Switched BaseBlocking<T> to derive from ManualResetEventSlim, which is sufficient for the task at hand

Aaron Schiff 6 ani în urmă
părinte
comite
14bf2d3cd6

+ 8 - 61
Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs

@@ -2,105 +2,52 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 // See the LICENSE file in the project root for more information. 
 
 
-using System.Reactive.Disposables;
 using System.Threading;
 using System.Threading;
 
 
 namespace System.Reactive.Linq.ObservableImpl
 namespace System.Reactive.Linq.ObservableImpl
 {
 {
-    internal abstract class BaseBlocking<T> : CountdownEvent, IObserver<T>
+    internal abstract class BaseBlocking<T> : ManualResetEventSlim, IObserver<T>
     {
     {
-        protected IDisposable _upstream;
-
         internal T _value;
         internal T _value;
         internal bool _hasValue;
         internal bool _hasValue;
         internal Exception _error;
         internal Exception _error;
-        private int _once;
-
-        internal BaseBlocking() : base(1) { }
 
 
-        internal void SetUpstream(IDisposable d)
-        {
-            Disposable.SetSingle(ref _upstream, d);
-        }
+        internal BaseBlocking() { }
 
 
-        protected void Unblock()
+        public void OnCompleted()
         {
         {
-            if (Interlocked.CompareExchange(ref _once, 1, 0) == 0)
-            {
-                Signal();
-            }
+            Set();
         }
         }
 
 
-        public abstract void OnCompleted();
-        public virtual void OnError(Exception error)
+        public void OnError(Exception error)
         {
         {
             _value = default;
             _value = default;
             _error = error;
             _error = error;
-            Unblock();
+            Set();
         }
         }
-        public abstract void OnNext(T value);
 
 
-        public new void Dispose()
-        {
-            base.Dispose();
-            if (!Disposable.GetIsDisposed(ref _upstream))
-            {
-                Disposable.TryDispose(ref _upstream);
-            }
-        }
+        public abstract void OnNext(T value);
     }
     }
 
 
     internal sealed class FirstBlocking<T> : BaseBlocking<T>
     internal sealed class FirstBlocking<T> : BaseBlocking<T>
     {
     {
-        public override void OnCompleted()
-        {
-            Unblock();
-            if (!Disposable.GetIsDisposed(ref _upstream))
-            {
-                Disposable.TryDispose(ref _upstream);
-            }
-        }
-
-        public override void OnError(Exception error)
-        {
-            base.OnError(error);
-            if (!Disposable.GetIsDisposed(ref _upstream))
-            {
-                Disposable.TryDispose(ref _upstream);
-            }
-        }
-
         public override void OnNext(T value)
         public override void OnNext(T value)
         {
         {
             if (!_hasValue)
             if (!_hasValue)
             {
             {
                 _value = value;
                 _value = value;
                 _hasValue = true;
                 _hasValue = true;
-                Disposable.TryDispose(ref _upstream);
-                Unblock();
+                Set();
             }
             }
         }
         }
     }
     }
 
 
     internal sealed class LastBlocking<T> : BaseBlocking<T>
     internal sealed class LastBlocking<T> : BaseBlocking<T>
     {
     {
-        public override void OnCompleted()
-        {
-            Unblock();
-            Disposable.TryDispose(ref _upstream);
-        }
-
-        public override void OnError(Exception error)
-        {
-            base.OnError(error);
-            Disposable.TryDispose(ref _upstream);
-        }
-
         public override void OnNext(T value)
         public override void OnNext(T value)
         {
         {
             _value = value;
             _value = value;
             _hasValue = true;
             _hasValue = true;
         }
         }
-
     }
     }
 }
 }

+ 4 - 14
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs

@@ -69,14 +69,9 @@ namespace System.Reactive.Linq
         {
         {
             using (var consumer = new FirstBlocking<TSource>())
             using (var consumer = new FirstBlocking<TSource>())
             {
             {
-                using (var d = source.Subscribe(consumer))
+                using (source.Subscribe(consumer))
                 {
                 {
-                    consumer.SetUpstream(d);
-
-                    if (consumer.CurrentCount != 0)
-                    {
-                        consumer.Wait();
-                    }
+                    consumer.Wait();
                 }
                 }
 
 
                 consumer._error.ThrowIfNotNull();
                 consumer._error.ThrowIfNotNull();
@@ -166,14 +161,9 @@ namespace System.Reactive.Linq
             using (var consumer = new LastBlocking<TSource>())
             using (var consumer = new LastBlocking<TSource>())
             {
             {
 
 
-                using (var d = source.Subscribe(consumer))
+                using (source.Subscribe(consumer))
                 {
                 {
-                    consumer.SetUpstream(d);
-
-                    if (consumer.CurrentCount != 0)
-                    {
-                        consumer.Wait();
-                    }
+                    consumer.Wait();
                 }
                 }
 
 
                 consumer._error.ThrowIfNotNull();
                 consumer._error.ThrowIfNotNull();