Ver Fonte

Improve blocking First & Last operators (#590)

David Karnok há 7 anos atrás
pai
commit
8f67a9597d

+ 115 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstLastBlocking.cs

@@ -0,0 +1,115 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Reactive.Disposables;
+using System.Text;
+using System.Threading;
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+
+    internal abstract class BaseBlocking<T> : CountdownEvent, IObserver<T>
+    {
+        protected IDisposable _upstream;
+
+        internal T _value;
+        internal bool _hasValue;
+        internal Exception _error;
+
+        int once;
+
+        internal BaseBlocking() : base(1) { }
+
+        internal void SetUpstream(IDisposable d)
+        {
+            Disposable.SetSingle(ref _upstream, d);
+        }
+
+        protected void Unblock()
+        {
+            if (Interlocked.CompareExchange(ref once, 1, 0) == 0)
+            {
+                Signal();
+            }
+        }
+
+        public abstract void OnCompleted();
+        public virtual void OnError(Exception error)
+        {
+            _value = default;
+            _error = error;
+            Unblock();
+        }
+        public abstract void OnNext(T value);
+
+        public new void Dispose()
+        {
+            base.Dispose();
+            if (!Disposable.GetIsDisposed(ref _upstream))
+            {
+                Disposable.TryDispose(ref _upstream);
+            }
+        }
+    }
+
+    internal sealed class FirstBlocking<T> : BaseBlocking<T>
+    {
+        internal FirstBlocking() : base() { }
+
+        public override void OnCompleted()
+        {
+            Unblock();
+            if (!Disposable.GetIsDisposed(ref _upstream))
+            {
+                Disposable.TryDispose(ref _upstream);
+            }
+        }
+
+        public override void OnError(Exception error)
+        {
+            base.OnError(error);  
+            if (!Disposable.GetIsDisposed(ref _upstream))
+            {
+                Disposable.TryDispose(ref _upstream);
+            }
+        }
+
+        public override void OnNext(T value)
+        {
+            if (!_hasValue)
+            {
+                _value = value;
+                _hasValue = true;
+                Disposable.TryDispose(ref _upstream);
+                Unblock();
+            }
+        }
+    }
+
+    internal sealed class LastBlocking<T> : BaseBlocking<T>
+    {
+        internal LastBlocking() : base() { }
+
+        public override void OnCompleted()
+        {
+            Unblock();
+            Disposable.TryDispose(ref _upstream);
+        }
+
+        public override void OnError(Exception error)
+        {
+            base.OnError(error);
+            Disposable.TryDispose(ref _upstream);
+        }
+
+        public override void OnNext(T value)
+        {
+            _value = value;
+            _hasValue = true;
+        }
+
+    }
+}

+ 31 - 64
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs

@@ -67,45 +67,26 @@ namespace System.Reactive.Linq
 
 
         private static TSource FirstOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
         private static TSource FirstOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
         {
         {
-            var value = default(TSource);
-            var seenValue = false;
-            var ex = default(Exception);
-
-            using (var evt = new WaitAndSetOnce())
+            using (var consumer = new FirstBlocking<TSource>())
             {
             {
-                //
-                // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
-                //
-                using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
-                    v =>
-                    {
-                        if (!seenValue)
-                        {
-                            value = v;
-                        }
-                        seenValue = true;
-                        evt.Set();
-                    },
-                    e =>
-                    {
-                        ex = e;
-                        evt.Set();
-                    },
-                    () =>
-                    {
-                        evt.Set();
-                    })))
+                using (var d = source.Subscribe(consumer))
                 {
                 {
-                    evt.WaitOne();
-                }
-            }
+                    consumer.SetUpstream(d);
 
 
-            ex.ThrowIfNotNull();
+                    if (consumer.CurrentCount != 0)
+                    {
+                        consumer.Wait();
+                    }
+                }
 
 
-            if (throwOnEmpty && !seenValue)
-                throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
+                consumer._error.ThrowIfNotNull();
 
 
-            return value;
+                if (throwOnEmpty && !consumer._hasValue)
+                {
+                    throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
+                }
+                return consumer._value;
+            }
         }
         }
 
 
         #endregion
         #endregion
@@ -182,41 +163,27 @@ namespace System.Reactive.Linq
 
 
         private static TSource LastOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
         private static TSource LastOrDefaultInternal<TSource>(IObservable<TSource> source, bool throwOnEmpty)
         {
         {
-            var value = default(TSource);
-            var seenValue = false;
-            var ex = default(Exception);
-
-            using (var evt = new WaitAndSetOnce())
+            using (var consumer = new LastBlocking<TSource>())
             {
             {
-                //
-                // [OK] Use of unsafe Subscribe: fine to throw to our caller, behavior indistinguishable from going through the sink.
-                //
-                using (source.Subscribe/*Unsafe*/(new AnonymousObserver<TSource>(
-                    v =>
-                    {
-                        seenValue = true;
-                        value = v;
-                    },
-                    e =>
-                    {
-                        ex = e;
-                        evt.Set();
-                    },
-                    () =>
-                    {
-                        evt.Set();
-                    })))
+
+                using (var d = source.Subscribe(consumer))
                 {
                 {
-                    evt.WaitOne();
-                }
-            }
+                    consumer.SetUpstream(d);
 
 
-            ex.ThrowIfNotNull();
+                    if (consumer.CurrentCount != 0)
+                    {
+                        consumer.Wait();
+                    }
+                }
 
 
-            if (throwOnEmpty && !seenValue)
-                throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
+                consumer._error.ThrowIfNotNull();
 
 
-            return value;
+                if (throwOnEmpty && !consumer._hasValue)
+                {
+                    throw new InvalidOperationException(Strings_Linq.NO_ELEMENTS);
+                }
+                return consumer._value;
+            }
         }
         }
 
 
         #endregion
         #endregion