Ver Fonte

Use HalfSerializer extensions (#572)

Daniel C. Weber há 7 anos atrás
pai
commit
fa0d583cb2

+ 13 - 13
Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs

@@ -27,26 +27,26 @@ namespace System.Reactive
         /// Use a full SerializedObserver wrapper for merging multiple sequences.
         /// Use a full SerializedObserver wrapper for merging multiple sequences.
         /// </summary>
         /// </summary>
         /// <typeparam name="T">The element type of the observer.</typeparam>
         /// <typeparam name="T">The element type of the observer.</typeparam>
-        /// <param name="observer">The observer to signal events in a serialized fashion.</param>
+        /// <param name="sink">The observer to signal events in a serialized fashion.</param>
         /// <param name="item">The item to signal.</param>
         /// <param name="item">The item to signal.</param>
         /// <param name="wip">Indicates there is an emission going on currently.</param>
         /// <param name="wip">Indicates there is an emission going on currently.</param>
         /// <param name="error">The field containing an error or terminal indicator.</param>
         /// <param name="error">The field containing an error or terminal indicator.</param>
-        public static void OnNext<T>(IObserver<T> observer, T item, ref int wip, ref Exception error)
+        public static void ForwardOnNext<T>(ISink<T> sink, T item, ref int wip, ref Exception error)
         {
         {
             if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
             if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
             {
             {
-                observer.OnNext(item);
+                sink.ForwardOnNext(item);
                 if (Interlocked.Decrement(ref wip) != 0)
                 if (Interlocked.Decrement(ref wip) != 0)
                 {
                 {
                     var ex = error;
                     var ex = error;
                     if (ex != ExceptionHelper.Terminated)
                     if (ex != ExceptionHelper.Terminated)
                     {
                     {
                         error = ExceptionHelper.Terminated;
                         error = ExceptionHelper.Terminated;
-                        observer.OnError(ex);
+                        sink.ForwardOnError(ex);
                     }
                     }
                     else
                     else
                     {
                     {
-                        observer.OnCompleted();
+                        sink.ForwardOnCompleted();
                     }
                     }
                 }
                 }
             }
             }
@@ -55,23 +55,23 @@ namespace System.Reactive
         /// <summary>
         /// <summary>
         /// Signals the given exception to the observer. If there is a concurrent
         /// Signals the given exception to the observer. If there is a concurrent
         /// OnNext emission is happening, saves the exception into the given field
         /// OnNext emission is happening, saves the exception into the given field
-        /// otherwise to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/>.
+        /// otherwise to be picked up by <see cref="ForwardOnNext{T}"/>.
         /// This method can be called concurrently with itself and the other methods of this
         /// This method can be called concurrently with itself and the other methods of this
         /// helper class but only one terminal signal may actually win.
         /// helper class but only one terminal signal may actually win.
         /// </summary>
         /// </summary>
         /// <typeparam name="T">The element type of the observer.</typeparam>
         /// <typeparam name="T">The element type of the observer.</typeparam>
-        /// <param name="observer">The observer to signal events in a serialized fashion.</param>
+        /// <param name="sink">The observer to signal events in a serialized fashion.</param>
         /// <param name="ex">The exception to signal sooner or later.</param>
         /// <param name="ex">The exception to signal sooner or later.</param>
         /// <param name="wip">Indicates there is an emission going on currently.</param>
         /// <param name="wip">Indicates there is an emission going on currently.</param>
         /// <param name="error">The field containing an error or terminal indicator.</param>
         /// <param name="error">The field containing an error or terminal indicator.</param>
-        public static void OnError<T>(IObserver<T> observer, Exception ex, ref int wip, ref Exception error)
+        public static void ForwardOnError<T>(ISink<T> sink, Exception ex, ref int wip, ref Exception error)
         {
         {
             if (ExceptionHelper.TrySetException(ref error, ex))
             if (ExceptionHelper.TrySetException(ref error, ex))
             {
             {
                 if (Interlocked.Increment(ref wip) == 1)
                 if (Interlocked.Increment(ref wip) == 1)
                 {
                 {
                     error = ExceptionHelper.Terminated;
                     error = ExceptionHelper.Terminated;
-                    observer.OnError(ex);
+                    sink.ForwardOnError(ex);
                 }
                 }
             }
             }
         }
         }
@@ -79,22 +79,22 @@ namespace System.Reactive
         /// <summary>
         /// <summary>
         /// Signals OnCompleted on the observer. If there is a concurrent
         /// Signals OnCompleted on the observer. If there is a concurrent
         /// OnNext emission happening, the error field will host a special
         /// OnNext emission happening, the error field will host a special
-        /// terminal exception signal to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/> once it finishes with OnNext and signal the
+        /// terminal exception signal to be picked up by <see cref="ForwardOnNext{T}"/> once it finishes with OnNext and signal the
         /// OnCompleted as well.
         /// OnCompleted as well.
         /// This method can be called concurrently with itself and the other methods of this
         /// This method can be called concurrently with itself and the other methods of this
         /// helper class but only one terminal signal may actually win.
         /// helper class but only one terminal signal may actually win.
         /// </summary>
         /// </summary>
         /// <typeparam name="T">The element type of the observer.</typeparam>
         /// <typeparam name="T">The element type of the observer.</typeparam>
-        /// <param name="observer">The observer to signal events in a serialized fashion.</param>
+        /// <param name="sink">The observer to signal events in a serialized fashion.</param>
         /// <param name="wip">Indicates there is an emission going on currently.</param>
         /// <param name="wip">Indicates there is an emission going on currently.</param>
         /// <param name="error">The field containing an error or terminal indicator.</param>
         /// <param name="error">The field containing an error or terminal indicator.</param>
-        public static void OnCompleted<T>(IObserver<T> observer, ref int wip, ref Exception error)
+        public static void ForwardOnCompleted<T>(ISink<T> sink, ref int wip, ref Exception error)
         {
         {
             if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated))
             if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated))
             {
             {
                 if (Interlocked.Increment(ref wip) == 1)
                 if (Interlocked.Increment(ref wip) == 1)
                 {
                 {
-                    observer.OnCompleted();
+                    sink.ForwardOnCompleted();
                 }
                 }
             }
             }
         }
         }

+ 11 - 4
Rx.NET/Source/src/System.Reactive/Internal/Sink.cs

@@ -7,7 +7,14 @@ using System.Threading;
 
 
 namespace System.Reactive
 namespace System.Reactive
 {
 {
-    internal abstract class Sink<TTarget> : IDisposable
+    internal interface ISink<in TTarget>
+    {
+        void ForwardOnNext(TTarget value);
+        void ForwardOnCompleted();
+        void ForwardOnError(Exception error);
+    }
+
+    internal abstract class Sink<TTarget> : ISink<TTarget>, IDisposable
     {
     {
         private IDisposable _cancel;
         private IDisposable _cancel;
         private volatile IObserver<TTarget> _observer;
         private volatile IObserver<TTarget> _observer;
@@ -29,18 +36,18 @@ namespace System.Reactive
             Disposable.TryDispose(ref _cancel);
             Disposable.TryDispose(ref _cancel);
         }
         }
 
 
-        protected void ForwardOnNext(TTarget value)
+        public void ForwardOnNext(TTarget value)
         {
         {
             _observer.OnNext(value);
             _observer.OnNext(value);
         }
         }
 
 
-        protected void ForwardOnCompleted()
+        public void ForwardOnCompleted()
         {
         {
             _observer.OnCompleted();
             _observer.OnCompleted();
             Dispose();
             Dispose();
         }
         }
 
 
-        protected void ForwardOnError(Exception error)
+        public void ForwardOnError(Exception error)
         {
         {
             _observer.OnError(error);
             _observer.OnError(error);
             Dispose();
             Dispose();

+ 4 - 33
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -39,7 +39,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public IDisposable Run(SkipUntil<TSource, TOther> parent)
             public IDisposable Run(SkipUntil<TSource, TOther> parent)
             {
             {
                 Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));
                 Disposable.TrySetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));
-
                 Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));
                 Disposable.TrySetSingle(ref _mainDisposable, parent._source.Subscribe(this));
 
 
                 return this;
                 return this;
@@ -70,48 +69,20 @@ namespace System.Reactive.Linq.ObservableImpl
             public override void OnNext(TSource value)
             public override void OnNext(TSource value)
             {
             {
                 if (_forward)
                 if (_forward)
-                {
-                    if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
-                    {
-                        ForwardOnNext(value);
-                        if (Interlocked.Decrement(ref _halfSerializer) != 0)
-                        {
-                            var ex = _error;
-                            _error = SkipUntilTerminalException.Instance;
-                            ForwardOnError(ex);
-                        }
-                    }
-                }
+                    HalfSerializer.ForwardOnNext(this, value, ref _halfSerializer, ref _error);
             }
             }
 
 
             public override void OnError(Exception ex)
             public override void OnError(Exception ex)
             {
             {
-                if (Interlocked.CompareExchange(ref _error, ex, null) == null)
-                {
-                    if (Interlocked.Increment(ref _halfSerializer) == 1)
-                    {
-                        _error = SkipUntilTerminalException.Instance;
-                        ForwardOnError(ex);
-                    }
-                }
+                HalfSerializer.ForwardOnError(this, ex, ref _halfSerializer, ref _error);
             }
             }
 
 
             public override void OnCompleted()
             public override void OnCompleted()
             {
             {
                 if (_forward)
                 if (_forward)
-                {
-                    if (Interlocked.CompareExchange(ref _error, SkipUntilTerminalException.Instance, null) == null)
-                    {
-                        if (Interlocked.Increment(ref _halfSerializer) == 1)
-                        {
-                            ForwardOnCompleted();
-                        }
-                    }
-                }
+                    HalfSerializer.ForwardOnCompleted(this, ref _halfSerializer, ref _error);
                 else
                 else
-                {
                     DisposeMain();
                     DisposeMain();
-                }
             }
             }
 
 
             void OtherComplete()
             void OtherComplete()
@@ -143,7 +114,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 public void OnError(Exception error)
                 public void OnError(Exception error)
                 {
                 {
-                    _parent.OnError(error);
+                    HalfSerializer.ForwardOnError(_parent, error, ref _parent._halfSerializer, ref _parent._error);
                 }
                 }
 
 
                 public void OnNext(TOther value)
                 public void OnNext(TOther value)

+ 5 - 34
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -59,46 +59,17 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
             public override void OnNext(TSource value)
             public override void OnNext(TSource value)
             {
             {
-                if (Interlocked.CompareExchange(ref _halfSerializer, 1, 0) == 0)
-                {
-                    ForwardOnNext(value);
-                    if (Interlocked.Decrement(ref _halfSerializer) != 0)
-                    {
-                        var ex = _error;
-                        if (ex != TakeUntilTerminalException.Instance)
-                        {
-                            _error = TakeUntilTerminalException.Instance;
-                            ForwardOnError(ex);
-                        }
-                        else
-                        {
-                            ForwardOnCompleted();
-                        }
-                    }
-                }
+                HalfSerializer.ForwardOnNext(this, value, ref _halfSerializer, ref _error);
             }
             }
 
 
             public override void OnError(Exception ex)
             public override void OnError(Exception ex)
             {
             {
-                if (Interlocked.CompareExchange(ref _error, ex, null) == null)
-                {
-                    if (Interlocked.Increment(ref _halfSerializer) == 1)
-                    {
-                        _error = TakeUntilTerminalException.Instance;
-                        ForwardOnError(ex);
-                    }
-                }
+                HalfSerializer.ForwardOnError(this, ex, ref _halfSerializer, ref _error);
             }
             }
 
 
             public override void OnCompleted()
             public override void OnCompleted()
             {
             {
-                if (Interlocked.CompareExchange(ref _error, TakeUntilTerminalException.Instance, null) == null)
-                {
-                    if (Interlocked.Increment(ref _halfSerializer) == 1)
-                    {
-                        ForwardOnCompleted();
-                    }
-                }
+                HalfSerializer.ForwardOnCompleted(this, ref _halfSerializer, ref _error);
             }
             }
 
 
             sealed class OtherObserver : IObserver<TOther>
             sealed class OtherObserver : IObserver<TOther>
@@ -118,12 +89,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
 
                 public void OnError(Exception error)
                 public void OnError(Exception error)
                 {
                 {
-                    _parent.OnError(error);
+                    HalfSerializer.ForwardOnError(_parent, error, ref _parent._halfSerializer, ref _parent._error);
                 }
                 }
 
 
                 public void OnNext(TOther value)
                 public void OnNext(TOther value)
                 {
                 {
-                    _parent.OnCompleted();
+                    HalfSerializer.ForwardOnCompleted(_parent, ref _parent._halfSerializer, ref _parent._error);
                 }
                 }
             }
             }
 
 

+ 28 - 31
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Internal/HalfSerializerTest.cs

@@ -9,19 +9,16 @@ using System.Collections.Generic;
 
 
 namespace ReactiveTests.Tests
 namespace ReactiveTests.Tests
 {
 {
-    
     public class HalfSerializerTest
     public class HalfSerializerTest
     {
     {
         int wip;
         int wip;
-
         Exception error;
         Exception error;
-
         Consumer consumer = new Consumer();
         Consumer consumer = new Consumer();
 
 
         [Fact]
         [Fact]
         public void HalfSerializer_OnNext()
         public void HalfSerializer_OnNext()
         {
         {
-            HalfSerializer.OnNext(consumer, 1, ref wip, ref error);
+            HalfSerializer.ForwardOnNext(consumer, 1, ref wip, ref error);
 
 
             Assert.Equal(0, wip);
             Assert.Equal(0, wip);
             Assert.Null(error);
             Assert.Null(error);
@@ -37,12 +34,12 @@ namespace ReactiveTests.Tests
         {
         {
             var ex = new InvalidOperationException();
             var ex = new InvalidOperationException();
 
 
-            HalfSerializer.OnError(consumer, ex, ref wip, ref error);
+            HalfSerializer.ForwardOnError(consumer, ex, ref wip, ref error);
 
 
             Assert.Equal(1, wip);
             Assert.Equal(1, wip);
             Assert.Equal(error, ExceptionHelper.Terminated);
             Assert.Equal(error, ExceptionHelper.Terminated);
 
 
-            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+            HalfSerializer.ForwardOnNext(consumer, 2, ref wip, ref error);
 
 
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(0, consumer.done);
             Assert.Equal(0, consumer.done);
@@ -54,15 +51,15 @@ namespace ReactiveTests.Tests
         {
         {
             var ex = new InvalidOperationException();
             var ex = new InvalidOperationException();
 
 
-            HalfSerializer.OnError(consumer, ex, ref wip, ref error);
+            HalfSerializer.ForwardOnError(consumer, ex, ref wip, ref error);
 
 
             Assert.Equal(1, wip);
             Assert.Equal(1, wip);
             Assert.Equal(error, ExceptionHelper.Terminated);
             Assert.Equal(error, ExceptionHelper.Terminated);
 
 
-            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+            HalfSerializer.ForwardOnNext(consumer, 2, ref wip, ref error);
             var ex2 = new NotSupportedException();
             var ex2 = new NotSupportedException();
-            HalfSerializer.OnError(consumer, ex2, ref wip, ref error);
-            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+            HalfSerializer.ForwardOnError(consumer, ex2, ref wip, ref error);
+            HalfSerializer.ForwardOnCompleted(consumer, ref wip, ref error);
 
 
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(0, consumer.done);
             Assert.Equal(0, consumer.done);
@@ -72,12 +69,12 @@ namespace ReactiveTests.Tests
         [Fact]
         [Fact]
         public void HalfSerializer_OnCompleted()
         public void HalfSerializer_OnCompleted()
         {
         {
-            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+            HalfSerializer.ForwardOnCompleted(consumer, ref wip, ref error);
 
 
             Assert.Equal(1, wip);
             Assert.Equal(1, wip);
             Assert.Equal(error, ExceptionHelper.Terminated);
             Assert.Equal(error, ExceptionHelper.Terminated);
 
 
-            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+            HalfSerializer.ForwardOnNext(consumer, 2, ref wip, ref error);
 
 
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(1, consumer.done);
             Assert.Equal(1, consumer.done);
@@ -87,15 +84,15 @@ namespace ReactiveTests.Tests
         [Fact]
         [Fact]
         public void HalfSerializer_OnCompleted_Ignore_Further_Events()
         public void HalfSerializer_OnCompleted_Ignore_Further_Events()
         {
         {
-            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+            HalfSerializer.ForwardOnCompleted(consumer, ref wip, ref error);
 
 
             Assert.Equal(1, wip);
             Assert.Equal(1, wip);
             Assert.Equal(error, ExceptionHelper.Terminated);
             Assert.Equal(error, ExceptionHelper.Terminated);
 
 
-            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+            HalfSerializer.ForwardOnNext(consumer, 2, ref wip, ref error);
             var ex2 = new NotSupportedException();
             var ex2 = new NotSupportedException();
-            HalfSerializer.OnError(consumer, ex2, ref wip, ref error);
-            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+            HalfSerializer.ForwardOnError(consumer, ex2, ref wip, ref error);
+            HalfSerializer.ForwardOnCompleted(consumer, ref wip, ref error);
 
 
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(0, consumer.items.Count);
             Assert.Equal(1, consumer.done);
             Assert.Equal(1, consumer.done);
@@ -108,7 +105,7 @@ namespace ReactiveTests.Tests
         {
         {
             var c = new ReentrantConsumer(this, true);
             var c = new ReentrantConsumer(this, true);
 
 
-            HalfSerializer.OnNext(c, 1, ref wip, ref error);
+            HalfSerializer.ForwardOnNext(c, 1, ref wip, ref error);
 
 
             Assert.Equal(1, wip);
             Assert.Equal(1, wip);
             Assert.Equal(error, ExceptionHelper.Terminated);
             Assert.Equal(error, ExceptionHelper.Terminated);
@@ -125,7 +122,7 @@ namespace ReactiveTests.Tests
         {
         {
             var c = new ReentrantConsumer(this, false);
             var c = new ReentrantConsumer(this, false);
 
 
-            HalfSerializer.OnNext(c, 1, ref wip, ref error);
+            HalfSerializer.ForwardOnNext(c, 1, ref wip, ref error);
 
 
             Assert.Equal(1, wip);
             Assert.Equal(1, wip);
             Assert.Equal(error, ExceptionHelper.Terminated);
             Assert.Equal(error, ExceptionHelper.Terminated);
@@ -136,30 +133,30 @@ namespace ReactiveTests.Tests
             Assert.Null(consumer.exc);
             Assert.Null(consumer.exc);
         }
         }
 
 
-        sealed class Consumer : IObserver<int>
+        sealed class Consumer : ISink<int>
         {
         {
             internal List<int> items = new List<int>();
             internal List<int> items = new List<int>();
 
 
             internal int done;
             internal int done;
             internal Exception exc;
             internal Exception exc;
 
 
-            public void OnCompleted()
+            public void ForwardOnCompleted()
             {
             {
                 done++;
                 done++;
             }
             }
 
 
-            public void OnError(Exception error)
+            public void ForwardOnError(Exception error)
             {
             {
                 exc = error;
                 exc = error;
             }
             }
 
 
-            public void OnNext(int value)
+            public void ForwardOnNext(int value)
             {
             {
                 items.Add(value);
                 items.Add(value);
             }
             }
         }
         }
 
 
-        sealed class ReentrantConsumer : IObserver<int>
+        sealed class ReentrantConsumer : ISink<int>
         {
         {
             readonly HalfSerializerTest parent;
             readonly HalfSerializerTest parent;
 
 
@@ -173,25 +170,25 @@ namespace ReactiveTests.Tests
                 this.errorReenter = errorReenter;
                 this.errorReenter = errorReenter;
             }
             }
 
 
-            public void OnCompleted()
+            public void ForwardOnCompleted()
             {
             {
-                parent.consumer.OnCompleted();
+                parent.consumer.ForwardOnCompleted();
             }
             }
 
 
-            public void OnError(Exception error)
+            public void ForwardOnError(Exception error)
             {
             {
-                parent.consumer.OnError(error);
+                parent.consumer.ForwardOnError(error);
             }
             }
 
 
-            public void OnNext(int value)
+            public void ForwardOnNext(int value)
             {
             {
-                parent.consumer.OnNext(value);
+                parent.consumer.ForwardOnNext(value);
                 if (errorReenter)
                 if (errorReenter)
                 {
                 {
-                    HalfSerializer.OnError(this, x, ref parent.wip, ref parent.error);
+                    HalfSerializer.ForwardOnError(this, x, ref parent.wip, ref parent.error);
                 } else
                 } else
                 {
                 {
-                    HalfSerializer.OnCompleted(this, ref parent.wip, ref parent.error);
+                    HalfSerializer.ForwardOnCompleted(this, ref parent.wip, ref parent.error);
                 }
                 }
             }
             }
         }
         }