Browse Source

Merge pull request #776 from danielcweber/SaveAllocationsInContinueWith

Save allocations in ContinueWith
Daniel C. Weber 7 years ago
parent
commit
3e41be8503

+ 115 - 0
Rx.NET/Source/src/System.Reactive/Internal/TaskExtensions.cs

@@ -0,0 +1,115 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+namespace System.Threading.Tasks
+{
+    internal static class TaskExtensions
+    {
+        public static Task ContinueWithState<TState>(this Task task, Action<Task, TState> continuationAction, TState state)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state));
+        }
+
+        public static Task ContinueWithState<TState>(this Task task, Action<Task, TState> continuationAction, TState state, CancellationToken cancellationToken)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state),
+                cancellationToken);
+        }
+
+        public static Task ContinueWithState<TState>(this Task task, Action<Task, TState> continuationAction, TState state, TaskContinuationOptions continuationOptions)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state),
+                continuationOptions);
+        }
+
+        public static Task ContinueWithState<TState>(this Task task, Action<Task, TState> continuationAction, TState state, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state),
+                cancellationToken,
+                continuationOptions,
+                TaskScheduler.Default);
+        }
+
+        public static Task ContinueWithState<TResult, TState>(this Task<TResult> task, Action<Task<TResult>, TState> continuationAction, TState state)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task<TResult>, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state));
+        }
+
+        public static Task ContinueWithState<TResult, TState>(this Task<TResult> task, Action<Task<TResult>, TState> continuationAction, TState state, CancellationToken cancellationToken)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task<TResult>, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state),
+                cancellationToken);
+        }
+
+        public static Task ContinueWithState<TResult, TState>(this Task<TResult> task, Action<Task<TResult>, TState> continuationAction, TState state, TaskContinuationOptions continuationOptions)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task<TResult>, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state),
+                continuationOptions);
+        }
+
+        public static Task ContinueWithState<TResult, TState>(this Task<TResult> task, Action<Task<TResult>, TState> continuationAction, TState state, CancellationToken cancellationToken, TaskContinuationOptions continuationOptions)
+        {
+            return task.ContinueWith(
+                (t, tupleObject) =>
+                {
+                    var (closureAction, closureState) = ((Action<Task<TResult>, TState>, TState))tupleObject;
+
+                    closureAction(t, closureState);
+                },
+                (continuationAction, state),
+                cancellationToken,
+                continuationOptions,
+                TaskScheduler.Default);
+        }
+    }
+}

+ 2 - 18
Rx.NET/Source/src/System.Reactive/Linq/Observable/SelectMany.cs

@@ -588,18 +588,10 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        AttachContinuation(value, task);
+                        task.ContinueWithState((t, tuple) => [email protected](tuple.value, t), (@this: this, value), _cancel.Token);
                     }
                 }
 
-                private void AttachContinuation(TSource value, Task<TCollection> task)
-                {
-                    //
-                    // Separate method to avoid closure in synchronous completion case.
-                    //
-                    task.ContinueWith(t => OnCompletedTask(value, t), _cancel.Token);
-                }
-
                 private void OnCompletedTask(TSource value, Task<TCollection> task)
                 {
                     switch (task.Status)
@@ -753,18 +745,10 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     else
                     {
-                        AttachContinuation(value, index, task);
+                        task.ContinueWithState((t, tuple) => [email protected](tuple.value, tuple.index, t), (@this: this, value, index), _cancel.Token);
                     }
                 }
 
-                private void AttachContinuation(TSource value, int index, Task<TCollection> task)
-                {
-                    //
-                    // Separate method to avoid closure in synchronous completion case.
-                    //
-                    task.ContinueWith(t => OnCompletedTask(value, index, t), _cancel.Token);
-                }
-
                 private void OnCompletedTask(TSource value, int index, Task<TCollection> task)
                 {
                     switch (task.Status)

+ 18 - 2
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -41,7 +41,15 @@ namespace System.Reactive.Threading.Tasks
                 if (_scheduler == null)
                     _task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver<Unit>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
                 else
-                    _task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver<Unit>)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current);
+                {
+                    _task.ContinueWithState(
+                        (task, tuple) => tuple.@this._scheduler.ScheduleAction(
+                            (task, tuple.observer),
+                            tuple2 => tuple2.task.EmitTaskResult(tuple2.observer)),
+                        (@this: this, observer),
+                        cts.Token,
+                        options);
+                }
 
                 return cts;
             }
@@ -71,7 +79,15 @@ namespace System.Reactive.Threading.Tasks
                 if (_scheduler == null)
                     _task.ContinueWith((t, subjectObject) => t.EmitTaskResult((IObserver<TResult>)subjectObject), observer, cts.Token, options, TaskScheduler.Current);
                 else
-                    _task.ContinueWith((t, subjectObject) => _scheduler.ScheduleAction((t, subjectObject), tuple => tuple.t.EmitTaskResult((IObserver<TResult>)tuple.subjectObject)), observer, cts.Token, options, TaskScheduler.Current);
+                {
+                    _task.ContinueWithState(
+                        (task, tuple) => tuple.@this._scheduler.ScheduleAction(
+                            (task, tuple.observer),
+                            tuple2 => tuple2.task.EmitTaskResult(tuple2.observer)),
+                        (@this: this, observer),
+                        cts.Token,
+                        options);
+                }
 
                 return cts;
             }