Browse Source

Improve TailRecursiveSink with less allocations (#620)

David Karnok 7 years ago
parent
commit
8b5661188e
1 changed files with 62 additions and 31 deletions
  1. 62 31
      Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs

+ 62 - 31
Rx.NET/Source/src/System.Reactive/Internal/TailRecursiveSink.cs

@@ -18,43 +18,34 @@ namespace System.Reactive
 
         bool _isDisposed;
 
-        int trampoline;
+        int _trampoline;
 
-        IDisposable currentSubscription;
+        IDisposable _currentSubscription;
 
-        Stack<IEnumerator<IObservable<TSource>>> stack;
+        Stack<IEnumerator<IObservable<TSource>>> _stack = new Stack<IEnumerator<IObservable<TSource>>>();
 
         public void Run(IEnumerable<IObservable<TSource>> sources)
         {
             if (!TryGetEnumerator(sources, out var current))
                 return;
 
-            stack = new Stack<IEnumerator<IObservable<TSource>>>();
-            stack.Push(current);
+            _stack.Push(current);
 
             Drain();
-
-            SetUpstream(new RecursiveSinkDisposable(this));
         }
 
-        sealed class RecursiveSinkDisposable : IDisposable
+        protected override void Dispose(bool disposing)
         {
-            readonly TailRecursiveSink<TSource> parent;
-
-            public RecursiveSinkDisposable(TailRecursiveSink<TSource> parent)
-            {
-                this.parent = parent;
-            }
-
-            public void Dispose()
+            if (disposing)
             {
-                parent.DisposeAll();
+                DisposeAll();
             }
+            base.Dispose(disposing);
         }
 
         void Drain()
         {
-            if (Interlocked.Increment(ref trampoline) != 1)
+            if (Interlocked.Increment(ref _trampoline) != 1)
             {
                 return;
             }
@@ -63,19 +54,19 @@ namespace System.Reactive
             {
                 if (Volatile.Read(ref _isDisposed))
                 {
-                    while (stack.Count != 0)
+                    while (_stack.Count != 0)
                     {
-                        var enumerator = stack.Pop();
+                        var enumerator = _stack.Pop();
                         enumerator.Dispose();
                     }
 
-                    Disposable.TryDispose(ref currentSubscription);
+                    Disposable.TryDispose(ref _currentSubscription);
                 }
                 else
                 {
-                    if (stack.Count != 0)
+                    if (_stack.Count != 0)
                     {
-                        var currentEnumerator = stack.Peek();
+                        var currentEnumerator = _stack.Peek();
 
                         var currentObservable = default(IObservable<TSource>);
                         var next = default(IObservable<TSource>);
@@ -117,7 +108,7 @@ namespace System.Reactive
                             {
                                 if (TryGetEnumerator(nextSeq, out var nextEnumerator))
                                 {
-                                    stack.Push(nextEnumerator);
+                                    _stack.Push(nextEnumerator);
                                     continue;
                                 }
                                 else
@@ -128,12 +119,32 @@ namespace System.Reactive
                             }
                             else
                             {
-                                var sad = new SingleAssignmentDisposable();
+                                // we need an unique indicator for this as
+                                // Subscribe could return a Disposable.Empty or
+                                // a BooleanDisposable
+                                var sad = ReadyToken.Ready;
 
-                                if (Disposable.TrySetSingle(ref currentSubscription, sad) == TrySetSingleResult.Success)
+                                // Swap in the Ready indicator so we know the sequence hasn't been disposed
+                                if (Disposable.TrySetSingle(ref _currentSubscription, sad) == TrySetSingleResult.Success)
                                 {
-                                    sad.Disposable = next.SubscribeSafe(this);
-                                }
+                                    // subscribe to the source
+                                    var d = next.SubscribeSafe(this);
+
+                                    // Try to swap in the returned disposable in place of the Ready indicator
+                                    // Since this drain loop is the only one to use Ready, this should
+                                    // be unambiguous
+                                    var u = Interlocked.CompareExchange(ref _currentSubscription, d, sad);
+                                    
+                                    // sequence disposed or completed synchronously
+                                    if (u != sad)
+                                    {
+                                        d.Dispose();
+                                        if (u == BooleanDisposable.True)
+                                        {
+                                            continue;
+                                        }
+                                    }
+                                }   
                                 else
                                 {
                                     continue;
@@ -142,7 +153,7 @@ namespace System.Reactive
                         }
                         else
                         {
-                            stack.Pop();
+                            _stack.Pop();
                             currentEnumerator.Dispose();
                             continue;
                         }
@@ -154,7 +165,7 @@ namespace System.Reactive
                     }
                 }
 
-                if (Interlocked.Decrement(ref trampoline) == 0)
+                if (Interlocked.Decrement(ref _trampoline) == 0)
                 {
                     break;
                 }
@@ -171,7 +182,7 @@ namespace System.Reactive
 
         protected void Recurse()
         {
-            if (Disposable.TrySetSerial(ref currentSubscription, null))
+            if (Disposable.TrySetSerial(ref _currentSubscription, null))
                 Drain();
         }
 
@@ -205,4 +216,24 @@ namespace System.Reactive
             return false;
         }
     }
+
+    /// <summary>
+    /// Holds onto a singleton IDisposable indicating a ready state.
+    /// </summary>
+    static class ReadyToken
+    {
+        /// <summary>
+        /// This indicates the operation has been prepared and ready for
+        /// the next step.
+        /// </summary>
+        internal static readonly IDisposable Ready = new ReadyDisposable();
+
+        sealed class ReadyDisposable : IDisposable
+        {
+            public void Dispose()
+            {
+                // deliberately no-op
+            }
+        }
+    }
 }