浏览代码

Make Scheduler.Recursive more uniform and allocate less (#617)

David Karnok 7 年之前
父节点
当前提交
e2092e1a2d
共有 1 个文件被更改,包括 111 次插入110 次删除
  1. 111 110
      Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs

+ 111 - 110
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs

@@ -2,6 +2,7 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System.Collections.Generic;
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Concurrency
@@ -46,45 +47,9 @@ namespace System.Reactive.Concurrency
 
         private static IDisposable InvokeRec1<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState>> action) tuple)
         {
-            var group = new CompositeDisposable(1);
-            var gate = new object();
-
-            Action<TState> recursiveAction = null;
-            recursiveAction = state1 => tuple.action(state1, state2 =>
-            {
-                var isAdded = false;
-                var isDone = false;
-                var d = default(IDisposable);
-                d = scheduler.Schedule(state2, (scheduler1, state3) =>
-                {
-                    lock (gate)
-                    {
-                        if (isAdded)
-                        {
-                            group.Remove(d);
-                        }
-                        else
-                        {
-                            isDone = true;
-                        }
-                    }
-                    recursiveAction(state3);
-                    return Disposable.Empty;
-                });
-
-                lock (gate)
-                {
-                    if (!isDone)
-                    {
-                        group.Add(d);
-                        isAdded = true;
-                    }
-                }
-            });
-
-            recursiveAction(tuple.state);
-
-            return group;
+            var recursiveInvoker = new InvokeRec1State<TState>(scheduler, tuple.action);
+            recursiveInvoker.InvokeFirst(tuple.state);
+            return recursiveInvoker;
         }
 
         /// <summary>
@@ -127,45 +92,9 @@ namespace System.Reactive.Concurrency
 
         private static IDisposable InvokeRec2<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, TimeSpan>> action) tuple)
         {
-            var group = new CompositeDisposable(1);
-            var gate = new object();
-
-            Action<TState> recursiveAction = null;
-            recursiveAction = state1 => tuple.action(state1, (state2, dueTime1) =>
-            {
-                var isAdded = false;
-                var isDone = false;
-                var d = default(IDisposable);
-                d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) =>
-                {
-                    lock (gate)
-                    {
-                        if (isAdded)
-                        {
-                            group.Remove(d);
-                        }
-                        else
-                        {
-                            isDone = true;
-                        }
-                    }
-                    recursiveAction(state3);
-                    return Disposable.Empty;
-                });
-                
-                lock (gate)
-                {
-                    if (!isDone)
-                    {
-                        group.Add(d);
-                        isAdded = true;
-                    }
-                }
-            });
-
-            recursiveAction(tuple.state);
-
-            return group;
+            var recursiveInvoker = new InvokeRec2State<TState>(scheduler, tuple.action);
+            recursiveInvoker.InvokeFirst(tuple.state);
+            return recursiveInvoker;
         }
 
         /// <summary>
@@ -208,45 +137,117 @@ namespace System.Reactive.Concurrency
 
         private static IDisposable InvokeRec3<TState>(IScheduler scheduler, (TState state, Action<TState, Action<TState, DateTimeOffset>> action) tuple)
         {
-            var group = new CompositeDisposable(1);
-            var gate = new object();
+            var recursiveInvoker = new InvokeRec3State<TState>(scheduler, tuple.action);
+            recursiveInvoker.InvokeFirst(tuple.state);
+            return recursiveInvoker;
+        }
+
+        abstract class InvokeRecBaseState<TState> : IDisposable
+        {
+            protected readonly IScheduler scheduler;
+
+            protected readonly CompositeDisposable group;
+
+            protected long index;
+
+            public InvokeRecBaseState(IScheduler scheduler)
+            {
+                this.scheduler = scheduler;
+                group = new CompositeDisposable();
+            }
+
+            public void Dispose()
+            {
+                group.Dispose();
+            }
+
+        }
+
+        sealed class InvokeRec1State<TState> : InvokeRecBaseState<TState>
+        {
+            readonly Action<TState, Action<TState>> action;
+
+            readonly Action<TState> recurseCallback;
+
+            public InvokeRec1State(IScheduler scheduler, Action<TState, Action<TState>> action) : base(scheduler)
+            {
+                this.action = action;
+                recurseCallback = state => InvokeNext(state);
+            }
+
+            internal void InvokeNext(TState state)
+            {
+                var sad = new SingleAssignmentDisposable();
+                group.Add(sad);
+                sad.Disposable = scheduler.Schedule((state, sad, @this: this), (_, nextState) => {
+                    [email protected](nextState.sad);
+                    [email protected](nextState.state);
+                    return Disposable.Empty;
+                });
+            }
+
+            internal void InvokeFirst(TState state)
+            {
+                action(state, recurseCallback);
+            }
+        }
+
+        sealed class InvokeRec2State<TState> : InvokeRecBaseState<TState>
+        {
+            readonly Action<TState, Action<TState, TimeSpan>> action;
+
+            readonly Action<TState, TimeSpan> recurseCallback;
 
-            Action<TState> recursiveAction = null;
-            recursiveAction = state1 => tuple.action(state1, (state2, dueTime1) =>
+            public InvokeRec2State(IScheduler scheduler, Action<TState, Action<TState, TimeSpan>> action) : base(scheduler)
             {
-                var isAdded = false;
-                var isDone = false;
-                var d = default(IDisposable);
-                d = scheduler.Schedule(state2, dueTime1, (scheduler1, state3) =>
-                {
-                    lock (gate)
-                    {
-                        if (isAdded)
-                        {
-                            group.Remove(d);
-                        }
-                        else
-                        {
-                            isDone = true;
-                        }
-                    }
-                    recursiveAction(state3);
+                this.action = action;
+                recurseCallback = (state, time) => InvokeNext(state, time);
+            }
+
+            internal void InvokeNext(TState state, TimeSpan time)
+            {
+                var sad = new SingleAssignmentDisposable();
+                group.Add(sad);
+                sad.Disposable = scheduler.Schedule((state, sad, @this: this), time, (_, nextState) => {
+                    [email protected](nextState.sad);
+                    [email protected](nextState.state);
                     return Disposable.Empty;
                 });
+            }
+
+            internal void InvokeFirst(TState state)
+            {
+                action(state, recurseCallback);
+            }
+        }
+
+        sealed class InvokeRec3State<TState> : InvokeRecBaseState<TState>
+        {
+            readonly Action<TState, Action<TState, DateTimeOffset>> action;
 
-                lock (gate)
-                {
-                    if (!isDone)
-                    {
-                        group.Add(d);
-                        isAdded = true;
-                    }
-                }
-            });
+            readonly Action<TState, DateTimeOffset> recurseCallback;
 
-            recursiveAction(tuple.state);
+            public InvokeRec3State(IScheduler scheduler, Action<TState, Action<TState, DateTimeOffset>> action) : base(scheduler)
+            {
+                this.action = action;
+                recurseCallback = (state, dtOffset) => InvokeNext(state, dtOffset);
+            }
+
+            internal void InvokeNext(TState state, DateTimeOffset dtOffset)
+            {
+                var sad = new SingleAssignmentDisposable();
+                group.Add(sad);
+                sad.Disposable = scheduler.Schedule((state, sad, @this: this), dtOffset, (_, nextState) => {
+                    [email protected](nextState.sad);
+                    [email protected](nextState.state);
+                    return Disposable.Empty;
+                });
+            }
 
-            return group;
+            internal void InvokeFirst(TState state)
+            {
+                action(state, recurseCallback);
+            }
         }
     }
 }