Kaynağa Gözat

Save even more allocations (#591)

Daniel C. Weber 7 yıl önce
ebeveyn
işleme
746a7b15ee

+ 37 - 18
Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs

@@ -182,6 +182,42 @@ namespace System.Reactive.Concurrency
 
         private sealed class LongRunning : ISchedulerLongRunning
         {
+            private sealed class LongScheduledWorkItem<TState> : ICancelable
+            {
+                private readonly TState _state;
+                private readonly Action<TState, ICancelable> _action;
+
+                private IDisposable _cancel;
+
+                public LongScheduledWorkItem(TState state, Action<TState, ICancelable> action)
+                {
+                    _state = state;
+                    _action = action;
+
+                    DefaultScheduler.s_cal.StartThread(
+                        @thisObject =>
+                        {
+                            var @this = (LongScheduledWorkItem<TState>)@thisObject;
+
+                            //
+                            // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
+                            // requires us to ensure the scheduled work gets an opportunity to observe
+                            // the cancellation request.
+                            //
+                            @this._action(@this._state, @this);
+                        },
+                        this
+                    );
+                }
+
+                public void Dispose()
+                {
+                    Disposable.TryDispose(ref _cancel);
+                }
+
+                public bool IsDisposed => Disposable.GetIsDisposed(ref _cancel);
+            }
+
             public static ISchedulerLongRunning Instance = new LongRunning();
 
             public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
@@ -189,24 +225,7 @@ namespace System.Reactive.Concurrency
                 if (action == null)
                     throw new ArgumentNullException(nameof(action));
 
-                var cancel = new BooleanDisposable();
-
-                DefaultScheduler.s_cal.StartThread(
-                    arg =>
-                    {
-                        var d = (ICancelable)arg;
-
-                        //
-                        // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
-                        // requires us to ensure the scheduled work gets an opportunity to observe
-                        // the cancellation request.
-                        //
-                        action(state, d);
-                    },
-                    cancel
-                );
-
-                return cancel;
+                return new LongScheduledWorkItem<TState>(state, action);
             }
         }
     }

+ 24 - 13
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

@@ -40,8 +40,30 @@ namespace System.Reactive.Concurrency
 
         sealed class SubscribeOnObservable<TSource> : ObservableBase<TSource>
         {
-            readonly IObservable<TSource> source;
+            private sealed class Subscription : IDisposable
+            {
+                private IDisposable _cancel;
+
+                public Subscription(IObservable<TSource> source, IScheduler scheduler, IObserver<TSource> observer)
+                {
+                    Disposable.TrySetSingle(
+                        ref _cancel, 
+                        scheduler.Schedule(
+                            (@this: this, source, observer),
+                            (closureScheduler, state) =>
+                            {
+                                Disposable.TrySetSerial(ref state.@this._cancel, new ScheduledDisposable(closureScheduler, state.source.SubscribeSafe(state.observer)));
+                                return Disposable.Empty;
+                            }));
+                }
+
+                public void Dispose()
+                {
+                    Disposable.TryDispose(ref _cancel);
+                }
+            }
 
+            readonly IObservable<TSource> source;
             readonly IScheduler scheduler;
 
             public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler)
@@ -52,18 +74,7 @@ namespace System.Reactive.Concurrency
 
             protected override IDisposable SubscribeCore(IObserver<TSource> observer)
             {
-                var m = new SingleAssignmentDisposable();
-                var d = new SerialDisposable();
-                d.Disposable = m;
-
-                m.Disposable = scheduler.Schedule((source, observer, d),
-                (scheduler, state) =>
-                {
-                    state.d.Disposable = new ScheduledDisposable(scheduler, state.source.SubscribeSafe(state.observer));
-                    return Disposable.Empty;
-                });
-
-                return d;
+                return new Subscription(source, scheduler, observer);
             }
         }
 

+ 136 - 59
Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs

@@ -14,6 +14,139 @@ namespace System.Reactive.Concurrency
     /// <seealso cref="TaskPoolScheduler.Default">Instance of this type using the default TaskScheduler to schedule work on the TPL task pool.</seealso>
     public sealed class TaskPoolScheduler : LocalScheduler, ISchedulerLongRunning, ISchedulerPeriodic
     {
+        private sealed class ScheduledWorkItem<TState> : IDisposable
+        {
+            private readonly TState _state;
+            private readonly TaskPoolScheduler _scheduler;
+            private readonly Func<IScheduler, TState, IDisposable> _action;
+
+            private IDisposable _cancel;
+
+            public ScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Func<IScheduler, TState, IDisposable> action)
+            {
+                _state = state;
+                _action = action;
+                _scheduler = scheduler;
+
+                var cancelable = new CancellationDisposable();
+
+                Disposable.SetSingle(ref _cancel, cancelable);
+
+                scheduler.taskFactory.StartNew(
+                    @thisObject =>
+                    {
+                        var @this = (ScheduledWorkItem<TState>)@thisObject;
+                        //
+                        // BREAKING CHANGE v2.0 > v1.x - No longer escalating exceptions using a throwing
+                        //                               helper thread.
+                        //
+                        // Our manual escalation based on the creation of a throwing thread was merely to
+                        // expedite the process of throwing the exception that would otherwise occur on the
+                        // finalizer thread at a later point during the app's lifetime.
+                        //
+                        // However, it also prevented applications from observing the exception through
+                        // the TaskScheduler.UnobservedTaskException static event. Also, starting form .NET
+                        // 4.5, the default behavior of the task pool is not to take down the application
+                        // when an exception goes unobserved (done as part of the async/await work). It'd
+                        // be weird for Rx not to follow the platform defaults.
+                        //
+                        // General implementation guidelines for schedulers (in order of importance):
+                        //
+                        //    1. Always thunk through to the underlying infrastructure with a wrapper that's as tiny as possible.
+                        //    2. Global exception notification/handling mechanisms shouldn't be bypassed.
+                        //    3. Escalation behavior for exceptions is left to the underlying infrastructure.
+                        //
+                        // The Catch extension method for IScheduler (added earlier) allows to re-route
+                        // exceptions at stage 2. If the exception isn't handled at the Rx level, it
+                        // propagates by means of a rethrow, falling back to behavior in 3.
+                        //
+                        Disposable.TrySetSerial(ref @this._cancel, @this._action(@this._scheduler, @this._state));
+                    },
+                    this,
+                    cancelable.Token);
+            }
+
+            public void Dispose()
+            {
+                Disposable.TryDispose(ref _cancel);
+            }
+        }
+
+        private sealed class SlowlyScheduledWorkItem<TState> : IDisposable
+        {
+            private readonly TState _state;
+            private readonly TaskPoolScheduler _scheduler;
+            private readonly Func<IScheduler, TState, IDisposable> _action;
+
+            private IDisposable _cancel;
+
+            public SlowlyScheduledWorkItem(TaskPoolScheduler scheduler, TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
+            {
+                _state = state;
+                _action = action;
+                _scheduler = scheduler;
+
+                var ct = new CancellationDisposable();
+                Disposable.SetSingle(ref _cancel, ct);
+
+                TaskHelpers.Delay(dueTime, ct.Token).ContinueWith(
+                    (_, @thisObject) =>
+                    {
+                        var @this = (SlowlyScheduledWorkItem<TState>)thisObject;
+
+                        if (!Disposable.GetIsDisposed(ref @this._cancel))
+                        {
+                            Disposable.TrySetMultiple(ref @this._cancel, @this._action(@this._scheduler, @this._state));
+                        }
+                    },
+                    this,
+                    CancellationToken.None,
+                    TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion,
+                    scheduler.taskFactory.Scheduler);
+            }
+
+            public void Dispose()
+            {
+                Disposable.TryDispose(ref _cancel);
+            }
+        }
+
+        private sealed class LongScheduledWorkItem<TState> : ICancelable
+        {
+            private readonly TState _state;
+            private readonly Action<TState, ICancelable> _action;
+
+            private IDisposable _cancel;
+
+            public LongScheduledWorkItem(TaskPoolScheduler scheduler, TState state, Action<TState, ICancelable> action)
+            {
+                _state = state;
+                _action = action;
+
+                scheduler.taskFactory.StartNew(
+                    @thisObject =>
+                    {
+                        var @this = (LongScheduledWorkItem<TState>) thisObject;
+
+                        //
+                        // Notice we don't check _cancel.IsDisposed. The contract for ISchedulerLongRunning
+                        // requires us to ensure the scheduled work gets an opportunity to observe
+                        // the cancellation request.
+                        //
+                        @this._action(@this._state, @this);
+                    },
+                    this,
+                    TaskCreationOptions.LongRunning);
+            }
+
+            public void Dispose()
+            {
+                Disposable.TryDispose(ref _cancel);
+            }
+
+            public bool IsDisposed => Disposable.GetIsDisposed(ref _cancel);
+        }
+
         private static readonly Lazy<TaskPoolScheduler> s_instance = new Lazy<TaskPoolScheduler>(() => new TaskPoolScheduler(new TaskFactory(TaskScheduler.Default)));
         private readonly TaskFactory taskFactory;
 
@@ -48,38 +181,7 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
 
-            var d = new SerialDisposable();
-            var cancelable = new CancellationDisposable();
-            d.Disposable = cancelable;
-            taskFactory.StartNew(() =>
-            {
-                //
-                // BREAKING CHANGE v2.0 > v1.x - No longer escalating exceptions using a throwing
-                //                               helper thread.
-                //
-                // Our manual escalation based on the creation of a throwing thread was merely to
-                // expedite the process of throwing the exception that would otherwise occur on the
-                // finalizer thread at a later point during the app's lifetime.
-                //
-                // However, it also prevented applications from observing the exception through
-                // the TaskScheduler.UnobservedTaskException static event. Also, starting form .NET
-                // 4.5, the default behavior of the task pool is not to take down the application
-                // when an exception goes unobserved (done as part of the async/await work). It'd
-                // be weird for Rx not to follow the platform defaults.
-                //
-                // General implementation guidelines for schedulers (in order of importance):
-                //
-                //    1. Always thunk through to the underlying infrastructure with a wrapper that's as tiny as possible.
-                //    2. Global exception notification/handling mechanisms shouldn't be bypassed.
-                //    3. Escalation behavior for exceptions is left to the underlying infrastructure.
-                //
-                // The Catch extension method for IScheduler (added earlier) allows to re-route
-                // exceptions at stage 2. If the exception isn't handled at the Rx level, it
-                // propagates by means of a rethrow, falling back to behavior in 3.
-                //
-                d.Disposable = action(this, state);
-            }, cancelable.Token);
-            return d;
+            return new ScheduledWorkItem<TState>(this, state, action);
         }
 
         /// <summary>
@@ -107,20 +209,7 @@ namespace System.Reactive.Concurrency
 
         private IDisposable ScheduleSlow<TState>(TState state, TimeSpan dueTime, Func<IScheduler, TState, IDisposable> action)
         {
-            var d = new MultipleAssignmentDisposable();
-
-            var ct = new CancellationDisposable();
-            d.Disposable = ct;
-
-            TaskHelpers.Delay(dueTime, ct.Token).ContinueWith(_ =>
-            {
-                if (!d.IsDisposed)
-                {
-                    d.Disposable = action(this, state);
-                }
-            }, CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler);
-
-            return d;
+            return new SlowlyScheduledWorkItem<TState>(this, state, dueTime, action);
         }
 
         /// <summary>
@@ -133,19 +222,7 @@ namespace System.Reactive.Concurrency
         /// <exception cref="ArgumentNullException"><paramref name="action"/> is <c>null</c>.</exception>
         public IDisposable ScheduleLongRunning<TState>(TState state, Action<TState, ICancelable> action)
         {
-            var d = new BooleanDisposable();
-
-            taskFactory.StartNew(() =>
-            {
-                //
-                // Notice we don't check d.IsDisposed. The contract for ISchedulerLongRunning
-                // requires us to ensure the scheduled work gets an opportunity to observe
-                // the cancellation request.
-                //
-                action(state, d);
-            }, TaskCreationOptions.LongRunning);
-
-            return d;
+            return new LongScheduledWorkItem<TState>(this, state, action);
         }
 
         /// <summary>