浏览代码

Add ExceptionHelper & HalfSerializer lock-free tools (#563)

David Karnok 7 年之前
父节点
当前提交
668ab84ccc

+ 110 - 0
Rx.NET/Source/src/System.Reactive/Internal/ExceptionHelper.cs

@@ -0,0 +1,110 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Linq;
+
+namespace System.Reactive
+{
+    /// <summary>
+    /// Utility methods to handle lock-free combining of Exceptions
+    /// as well as hosting a terminal-exception indicator for
+    /// lock-free termination support.
+    /// </summary>
+    internal static class ExceptionHelper
+    {
+        /// <summary>
+        /// The singleton instance of the exception indicating a terminal state,
+        /// DO NOT LEAK or signal this via OnError!
+        /// </summary>
+        public static Exception Terminated { get; } = new TerminatedException();
+
+        /// <summary>
+        /// Tries to atomically set the Exception on the given field if it is
+        /// still null.
+        /// </summary>
+        /// <param name="field">The target field to try to set atomically.</param>
+        /// <param name="ex">The exception to set, not null (not verified).</param>
+        /// <returns>True if the operation succeeded, false if the target was not null.</returns>
+        public static bool TrySetException(ref Exception field, Exception ex)
+        {
+            return Interlocked.CompareExchange(ref field, ex, null) == null;
+        }
+
+        /// <summary>
+        /// Atomically swaps in the Terminated exception into the field and
+        /// returns the previous exception in that field (which could be the
+        /// Terminated instance too).
+        /// </summary>
+        /// <param name="field">The target field to terminate.</param>
+        /// <returns>The previous exception in that field before the termination.</returns>
+        public static Exception Terminate(ref Exception field)
+        {
+            var current = Volatile.Read(ref field);
+            if (current != Terminated)
+            {
+                current = Interlocked.Exchange(ref field, Terminated);
+            }
+            return current;
+        }
+
+        /// <summary>
+        /// Atomically sets the field to the given new exception or combines
+        /// it with any pre-existing exception as a new AggregateException
+        /// unless the field contains the Terminated instance.
+        /// </summary>
+        /// <param name="field">The field to set or combine with.</param>
+        /// <param name="ex">The exception to combine with.</param>
+        /// <returns>True if successful, false if the field contains the Terminated instance.</returns>
+        /// <remarks>This type of atomic aggregation helps with operators that
+        /// want to delay all errors until all of their sources terminate in some way.</remarks>
+        public static bool TryAddException(ref Exception field, Exception ex)
+        {
+            for (; ; )
+            {
+                var current = Volatile.Read(ref field);
+                if (current == Terminated)
+                {
+                    return false;
+                }
+
+                var b = default(Exception);
+
+                if (current == null)
+                {
+                    b = ex;
+                }
+                else
+                if (current is AggregateException a)
+                {
+                    var list = new List<Exception>(a.InnerExceptions);
+                    list.Add(ex);
+                    b = new AggregateException(list);
+                }
+                else
+                {
+                    b = new AggregateException(current, ex);
+                }
+                if (Interlocked.CompareExchange(ref field, b, current) == current)
+                {
+                    return true;
+                }
+            }
+        }
+
+        /// <summary>
+        /// The class indicating a terminal state as an Exception type.
+        /// </summary>
+        sealed class TerminatedException : Exception
+        {
+            internal TerminatedException() : base("No further exceptions")
+            {
+
+            }
+        }
+    }
+}

+ 102 - 0
Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs

@@ -0,0 +1,102 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+
+namespace System.Reactive
+{
+    /// <summary>
+    /// Utility methods for dealing with serializing OnXXX signals
+    /// for an IObserver where concurrent OnNext is still not allowed
+    /// but concurrent OnError/OnCompleted may happen.
+    /// This serialization case is generally lower overhead than
+    /// a full SerializedObserver wrapper and doesn't need
+    /// allocation.
+    /// </summary>
+    internal static class HalfSerializer
+    {
+        /// <summary>
+        /// Signals the given item to the observer in a serialized fashion
+        /// allowing a concurrent OnError or OnCompleted emission to be delayed until
+        /// the observer.OnNext returns.
+        /// Do not call OnNext from multiple threads as it may lead to ignored items.
+        /// Use a full SerializedObserver wrapper for merging multiple sequences.
+        /// </summary>
+        /// <typeparam name="T">The element type of the observer.</typeparam>
+        /// <param name="observer">The observer to signal events in a serialized fashion.</param>
+        /// <param name="item">The item to signal.</param>
+        /// <param name="wip">Indicates there is an emission going on currently.</param>
+        /// <param name="error">The field containing an error or terminal indicator.</param>
+        public static void OnNext<T>(IObserver<T> observer, T item, ref int wip, ref Exception error)
+        {
+            if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
+            {
+                observer.OnNext(item);
+                if (Interlocked.Decrement(ref wip) != 0)
+                {
+                    var ex = error;
+                    if (ex != ExceptionHelper.Terminated)
+                    {
+                        error = ExceptionHelper.Terminated;
+                        observer.OnError(ex);
+                    }
+                    else
+                    {
+                        observer.OnCompleted();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// Signals the given exception to the observer. If there is a concurrent
+        /// OnNext emission is happening, saves the exception into the given field
+        /// otherwise to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/>.
+        /// This method can be called concurrently with itself and the other methods of this
+        /// helper class but only one terminal signal may actually win.
+        /// </summary>
+        /// <typeparam name="T">The element type of the observer.</typeparam>
+        /// <param name="observer">The observer to signal events in a serialized fashion.</param>
+        /// <param name="ex">The exception to signal sooner or later.</param>
+        /// <param name="wip">Indicates there is an emission going on currently.</param>
+        /// <param name="error">The field containing an error or terminal indicator.</param>
+        public static void OnError<T>(IObserver<T> observer, Exception ex, ref int wip, ref Exception error)
+        {
+            if (ExceptionHelper.TrySetException(ref error, ex))
+            {
+                if (Interlocked.Increment(ref wip) == 1)
+                {
+                    error = ExceptionHelper.Terminated;
+                    observer.OnError(ex);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Signals OnCompleted on the observer. If there is a concurrent
+        /// OnNext emission happening, the error field will host a special
+        /// terminal exception signal to be picked up by <see cref="OnNext{T}(IObserver{T}, T, ref int, ref Exception)"/> once it finishes with OnNext and signal the
+        /// OnCompleted as well.
+        /// This method can be called concurrently with itself and the other methods of this
+        /// helper class but only one terminal signal may actually win.
+        /// </summary>
+        /// <typeparam name="T">The element type of the observer.</typeparam>
+        /// <param name="observer">The observer to signal events in a serialized fashion.</param>
+        /// <param name="wip">Indicates there is an emission going on currently.</param>
+        /// <param name="error">The field containing an error or terminal indicator.</param>
+        public static void OnCompleted<T>(IObserver<T> observer, ref int wip, ref Exception error)
+        {
+            if (ExceptionHelper.TrySetException(ref error, ExceptionHelper.Terminated))
+            {
+                if (Interlocked.Increment(ref wip) == 1)
+                {
+                    observer.OnCompleted();
+                }
+            }
+        }
+    }
+}

+ 139 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Internal/ExceptionHelperTest.cs

@@ -0,0 +1,139 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive;
+using Xunit;
+using System;
+
+namespace ReactiveTests.Tests
+{
+    
+    public class ExceptionHelperTest
+    {
+        Exception errors;
+
+        [Fact]
+        public void ExceptionHelper_TrySetException_Empty()
+        {
+            var ex = new InvalidOperationException();
+
+            Assert.True(ExceptionHelper.TrySetException(ref errors, ex));
+
+            Assert.Equal(ex, errors);
+        }
+
+        [Fact]
+        public void ExceptionHelper_TrySetException_Not_Empty()
+        {
+            var ex1 = new InvalidOperationException();
+            errors = ex1;
+
+            var ex2 = new NotSupportedException();
+
+            Assert.False(ExceptionHelper.TrySetException(ref errors, ex2));
+
+            Assert.Equal(ex1, errors);
+        }
+
+        [Fact]
+        public void ExceptionHelper_TrySetException_Terminate_Empty()
+        {
+            var ex = ExceptionHelper.Terminate(ref errors);
+
+            Assert.Null(ex);
+            Assert.Equal(errors, ExceptionHelper.Terminated);
+        }
+
+        [Fact]
+        public void ExceptionHelper_TrySetException_Terminate_Not_Empty()
+        {
+            var ex1 = new InvalidOperationException();
+            errors = ex1;
+
+            var ex = ExceptionHelper.Terminate(ref errors);
+
+            Assert.Equal(ex, ex1);
+            Assert.Equal(errors, ExceptionHelper.Terminated);
+        }
+
+
+        [Fact]
+        public void ExceptionHelper_TrySetException_Terminate_Twice()
+        {
+            var ex1 = new InvalidOperationException();
+            errors = ex1;
+
+            var ex = ExceptionHelper.Terminate(ref errors);
+
+            Assert.Equal(ex, ex1);
+            Assert.Equal(errors, ExceptionHelper.Terminated);
+
+            ex = ExceptionHelper.Terminate(ref errors);
+
+            Assert.Equal(ex, ExceptionHelper.Terminated);
+            Assert.Equal(errors, ExceptionHelper.Terminated);
+        }
+
+        [Fact]
+        public void ExceptionHelper_TryAddException_Empty()
+        {
+            var ex1 = new InvalidOperationException();
+
+            Assert.True(ExceptionHelper.TryAddException(ref errors, ex1));
+
+            Assert.Equal(ex1, errors);
+        }
+
+        [Fact]
+        public void ExceptionHelper_TryAddException_Not_Empty()
+        {
+            var ex1 = new InvalidOperationException();
+            errors = ex1;
+
+            var ex2 = new NotImplementedException();
+
+            Assert.True(ExceptionHelper.TryAddException(ref errors, ex2));
+
+            Assert.True(errors is AggregateException);
+            var x = errors as AggregateException;
+
+            Assert.Equal(2, x.InnerExceptions.Count);
+            Assert.True(x.InnerExceptions[0] is InvalidOperationException);
+            Assert.True(x.InnerExceptions[1] is NotImplementedException);
+        }
+
+        [Fact]
+        public void ExceptionHelper_TryAddException_Aggregated()
+        {
+            var ex1 = new InvalidOperationException();
+            var ex2 = new NotImplementedException();
+
+            errors = new AggregateException(ex1, ex2);
+
+            var ex3 = new InvalidCastException();
+
+            Assert.True(ExceptionHelper.TryAddException(ref errors, ex3));
+
+            Assert.True(errors is AggregateException);
+            var x = errors as AggregateException;
+
+            Assert.Equal(3, x.InnerExceptions.Count);
+            Assert.True(x.InnerExceptions[0] is InvalidOperationException);
+            Assert.True(x.InnerExceptions[1] is NotImplementedException);
+            Assert.True(x.InnerExceptions[2] is InvalidCastException);
+        }
+
+        [Fact]
+        public void ExceptionHelper_TryAddException_Terminated()
+        {
+            errors = ExceptionHelper.Terminated;
+
+            var ex = new InvalidCastException();
+
+            Assert.False(ExceptionHelper.TryAddException(ref errors, ex));
+
+            Assert.Equal(errors, ExceptionHelper.Terminated);
+        }
+    }
+}

+ 199 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Internal/HalfSerializerTest.cs

@@ -0,0 +1,199 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System.Reactive;
+using Xunit;
+using System;
+using System.Collections.Generic;
+
+namespace ReactiveTests.Tests
+{
+    
+    public class HalfSerializerTest
+    {
+        int wip;
+
+        Exception error;
+
+        Consumer consumer = new Consumer();
+
+        [Fact]
+        public void HalfSerializer_OnNext()
+        {
+            HalfSerializer.OnNext(consumer, 1, ref wip, ref error);
+
+            Assert.Equal(0, wip);
+            Assert.Null(error);
+
+            Assert.Equal(1, consumer.items.Count);
+            Assert.Equal(1, consumer.items[0]);
+            Assert.Equal(0, consumer.done);
+            Assert.Null(consumer.exc);
+        }
+
+        [Fact]
+        public void HalfSerializer_OnError()
+        {
+            var ex = new InvalidOperationException();
+
+            HalfSerializer.OnError(consumer, ex, ref wip, ref error);
+
+            Assert.Equal(1, wip);
+            Assert.Equal(error, ExceptionHelper.Terminated);
+
+            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+
+            Assert.Equal(0, consumer.items.Count);
+            Assert.Equal(0, consumer.done);
+            Assert.Equal(ex, consumer.exc);
+        }
+
+        [Fact]
+        public void HalfSerializer_OnError_Ignore_Further_Events()
+        {
+            var ex = new InvalidOperationException();
+
+            HalfSerializer.OnError(consumer, ex, ref wip, ref error);
+
+            Assert.Equal(1, wip);
+            Assert.Equal(error, ExceptionHelper.Terminated);
+
+            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+            var ex2 = new NotSupportedException();
+            HalfSerializer.OnError(consumer, ex2, ref wip, ref error);
+            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+
+            Assert.Equal(0, consumer.items.Count);
+            Assert.Equal(0, consumer.done);
+            Assert.Equal(ex, consumer.exc);
+        }
+
+        [Fact]
+        public void HalfSerializer_OnCompleted()
+        {
+            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+
+            Assert.Equal(1, wip);
+            Assert.Equal(error, ExceptionHelper.Terminated);
+
+            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+
+            Assert.Equal(0, consumer.items.Count);
+            Assert.Equal(1, consumer.done);
+            Assert.Null(consumer.exc);
+        }
+
+        [Fact]
+        public void HalfSerializer_OnCompleted_Ignore_Further_Events()
+        {
+            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+
+            Assert.Equal(1, wip);
+            Assert.Equal(error, ExceptionHelper.Terminated);
+
+            HalfSerializer.OnNext(consumer, 2, ref wip, ref error);
+            var ex2 = new NotSupportedException();
+            HalfSerializer.OnError(consumer, ex2, ref wip, ref error);
+            HalfSerializer.OnCompleted(consumer, ref wip, ref error);
+
+            Assert.Equal(0, consumer.items.Count);
+            Assert.Equal(1, consumer.done);
+            Assert.Null(consumer.exc);
+        }
+
+        // Practically simulates concurrent invocation of the HalfSerializer methods
+        [Fact]
+        public void HalfSerializer_OnNext_Reentrant_Error()
+        {
+            var c = new ReentrantConsumer(this, true);
+
+            HalfSerializer.OnNext(c, 1, ref wip, ref error);
+
+            Assert.Equal(1, wip);
+            Assert.Equal(error, ExceptionHelper.Terminated);
+
+            Assert.Equal(1, consumer.items.Count);
+            Assert.Equal(1, consumer.items[0]);
+            Assert.Equal(0, consumer.done);
+            Assert.Equal(c.x, consumer.exc);
+        }
+
+        // Practically simulates concurrent invocation of the HalfSerializer methods
+        [Fact]
+        public void HalfSerializer_OnNext_Reentrant_OnCompleted()
+        {
+            var c = new ReentrantConsumer(this, false);
+
+            HalfSerializer.OnNext(c, 1, ref wip, ref error);
+
+            Assert.Equal(1, wip);
+            Assert.Equal(error, ExceptionHelper.Terminated);
+
+            Assert.Equal(1, consumer.items.Count);
+            Assert.Equal(1, consumer.items[0]);
+            Assert.Equal(1, consumer.done);
+            Assert.Null(consumer.exc);
+        }
+
+        sealed class Consumer : IObserver<int>
+        {
+            internal List<int> items = new List<int>();
+
+            internal int done;
+            internal Exception exc;
+
+            public void OnCompleted()
+            {
+                done++;
+            }
+
+            public void OnError(Exception error)
+            {
+                exc = error;
+            }
+
+            public void OnNext(int value)
+            {
+                items.Add(value);
+            }
+        }
+
+        sealed class ReentrantConsumer : IObserver<int>
+        {
+            readonly HalfSerializerTest parent;
+
+            readonly bool errorReenter;
+
+            internal readonly Exception x = new IndexOutOfRangeException();
+
+            public ReentrantConsumer(HalfSerializerTest parent, bool errorReenter)
+            {
+                this.parent = parent;
+                this.errorReenter = errorReenter;
+            }
+
+            public void OnCompleted()
+            {
+                parent.consumer.OnCompleted();
+            }
+
+            public void OnError(Exception error)
+            {
+                parent.consumer.OnError(error);
+            }
+
+            public void OnNext(int value)
+            {
+                parent.consumer.OnNext(value);
+                if (errorReenter)
+                {
+                    HalfSerializer.OnError(this, x, ref parent.wip, ref parent.error);
+                } else
+                {
+                    HalfSerializer.OnCompleted(this, ref parent.wip, ref parent.error);
+                }
+            }
+        }
+    }
+}