Pārlūkot izejas kodu

Use the stateful overload of AsyncLock in some places to save allocation of closures and allow delegate caching. (#583)

Daniel C. Weber 7 gadi atpakaļ
vecāks
revīzija
7551d1a016

+ 28 - 13
Rx.NET/Source/src/System.Reactive/Concurrency/DefaultScheduler.cs

@@ -128,25 +128,40 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
 
-            var state1 = state;
-            var gate = new AsyncLock();
+            return new PeriodicallyScheduledWorkItem<TState>(state, period, action);
+        }
+
+        private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
+        {
+            private TState _state;
+            private Func<TState, TState> _action;
+            private readonly IDisposable _cancel;
+            private readonly AsyncLock _gate = new AsyncLock();
 
-            var cancel = s_cal.StartPeriodicTimer(() =>
+            public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action)
             {
-                gate.Wait(() =>
-                {
-                    state1 = action(state1);
-                });
-            }, period);
+                _state = state;
+                _action = action;
+                
+                _cancel = s_cal.StartPeriodicTimer(Tick, period);
+            }
 
-            return Disposable.Create(() =>
+            private void Tick()
             {
-                cancel.Dispose();
-                gate.Dispose();
-                action = Stubs<TState>.I;
-            });
+                _gate.Wait(
+                    this,
+                    closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));
+            }
+
+            public void Dispose()
+            {
+                _cancel.Dispose();
+                _gate.Dispose();
+                _action = Stubs<TState>.I;
+            }
         }
 
+
         /// <summary>
         /// Discovers scheduler services by interface type.
         /// </summary>

+ 29 - 33
Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

@@ -191,52 +191,48 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
 
-            var start = _stopwatch.Elapsed;
-            var next = start + period;
-
-            var state1 = state;
+            return new PeriodicallyScheduledWorkItem<TState>(this, state, period, action);
+        }
 
-            var td = new TernaryDisposable();
+        private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
+        {
+            private readonly TimeSpan _period;
+            private readonly Func<TState, TState> _action;
+            private readonly EventLoopScheduler _scheduler;
+            private readonly AsyncLock _gate = new AsyncLock();
 
-            var gate = new AsyncLock();
-            td.Extra = gate;
+            private TState _state;
+            private TimeSpan _next;
+            private IDisposable _task;
 
-            var tick = default(Func<IScheduler, object, IDisposable>);
-            tick = (self_, _) =>
+            public PeriodicallyScheduledWorkItem(EventLoopScheduler scheduler, TState state, TimeSpan period, Func<TState, TState> action)
             {
-                next += period;
+                _state = state;
+                _period = period;
+                _action = action;
+                _scheduler = scheduler;
+                _next = scheduler._stopwatch.Elapsed + period;
 
-                td.Next = self_.Schedule(null, next - _stopwatch.Elapsed, tick);
-
-                gate.Wait(() =>
-                {
-                    state1 = action(state1);
-                });
-
-                return Disposable.Empty;
-            };
+                Disposable.TrySetSingle(ref _task, scheduler.Schedule(this, _next - scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
+            }
 
-            td.First = Schedule(null, next - _stopwatch.Elapsed, tick);
+            private IDisposable Tick(IScheduler self)
+            {
+                _next += _period;
 
-            return td;
-        }
+                Disposable.TrySetMultiple(ref _task, self.Schedule(this, _next - _scheduler._stopwatch.Elapsed, (_, s) => s.Tick(_)));
 
-        private sealed class TernaryDisposable : IDisposable
-        {
-            private IDisposable _task;
-            private IDisposable _extra;
+                _gate.Wait(
+                    this,
+                    closureWorkItem => closureWorkItem._state = closureWorkItem._action(closureWorkItem._state));
 
-            // If Next was called before this assignment is executed, it won't overwrite
-            // a more fresh IDisposable task
-            public IDisposable First { set { Disposable.TrySetSingle(ref _task, value); } }
-            // It is fine to overwrite the first or previous IDisposable task
-            public IDisposable Next { set { Disposable.TrySetMultiple(ref _task, value); } }
-            public IDisposable Extra { set { Disposable.SetSingle(ref _extra, value); } }
+                return Disposable.Empty;
+            }
 
             public void Dispose()
             {
                 Disposable.TryDispose(ref _task);
-                Disposable.TryDispose(ref _extra);
+                _gate.Dispose();
             }
         }
 

+ 23 - 19
Rx.NET/Source/src/System.Reactive/Concurrency/ImmediateScheduler.cs

@@ -78,13 +78,15 @@ namespace System.Reactive.Concurrency
                     asyncLock = new AsyncLock();
                 }
 
-                asyncLock.Wait(() =>
-                {
-                    if (!m.IsDisposed)
+                asyncLock.Wait(
+                    (@this: this, m, action, state),
+                    tuple =>
                     {
-                        m.Disposable = action(this, state);
-                    }
-                });
+                        if (!m.IsDisposed)
+                        {
+                            tuple.m.Disposable = tuple.action(tuple.@this, tuple.state);
+                        }
+                    });
 
                 return m;
             }
@@ -113,22 +115,24 @@ namespace System.Reactive.Concurrency
                     asyncLock = new AsyncLock();
                 }
 
-                asyncLock.Wait(() =>
-                {
-                    if (!m.IsDisposed)
+                asyncLock.Wait(
+                    (@this: this, m, state, action, timer, dueTime),
+                    tuple =>
                     {
-                        var sleep = dueTime - timer.Elapsed;
-                        if (sleep.Ticks > 0)
-                        {
-                            ConcurrencyAbstractionLayer.Current.Sleep(sleep);
-                        }
-
-                        if (!m.IsDisposed)
+                        if (!tuple.m.IsDisposed)
                         {
-                            m.Disposable = action(this, state);
+                            var sleep = tuple.dueTime - tuple.timer.Elapsed;
+                            if (sleep.Ticks > 0)
+                            {
+                                ConcurrencyAbstractionLayer.Current.Sleep(sleep);
+                            }
+
+                            if (!tuple.m.IsDisposed)
+                            {
+                                tuple.m.Disposable = tuple.action(tuple.@this, tuple.state);
+                            }
                         }
-                    }
-                });
+                    });
 
                 return m;
             }

+ 41 - 18
Rx.NET/Source/src/System.Reactive/Concurrency/TaskPoolScheduler.cs

@@ -179,31 +179,54 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
 
-            var cancel = new CancellationDisposable();
+            return new PeriodicallyScheduledWorkItem<TState>(state, period, action, taskFactory);
+        }
+
+        private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
+        {
+            private TState _state;
+
+            private readonly TimeSpan _period;
+            private readonly TaskFactory _taskFactory;
+            private readonly Func<TState, TState> _action;
+            private readonly AsyncLock _gate = new AsyncLock();
+            private readonly CancellationTokenSource _cts = new CancellationTokenSource();
+
+            public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action, TaskFactory taskFactory)
+            {
+                _state = state;
+                _period = period;
+                _action = action;
+                _taskFactory = taskFactory;
+
+                MoveNext();
+            }
 
-            var state1 = state;
-            var gate = new AsyncLock();
+            public void Dispose()
+            {
+                _cts.Cancel();
+                _gate.Dispose();
+            }
 
-            var moveNext = default(Action);
-            moveNext = () =>
+            private void MoveNext()
             {
-                TaskHelpers.Delay(period, cancel.Token).ContinueWith(
-                    _ =>
+                TaskHelpers.Delay(_period, _cts.Token).ContinueWith(
+                    (_, thisObject) =>
                     {
-                        moveNext();
+                        var @this = (PeriodicallyScheduledWorkItem<TState>)thisObject;
+
+                        @this.MoveNext();
 
-                        gate.Wait(() =>
-                        {
-                            state1 = action(state1);
-                        });
+                        @this._gate.Wait(
+                            @this,
+                            closureThis => closureThis._state = closureThis._action(closureThis._state));
                     },
-                    CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, taskFactory.Scheduler
+                    this,
+                    CancellationToken.None,
+                    TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, 
+                    _taskFactory.Scheduler
                 );
-            };
-
-            moveNext();
-
-            return StableCompositeDisposable.Create(cancel, gate);
+            }
         }
     }
 }

+ 31 - 17
Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.Windows.cs

@@ -157,26 +157,40 @@ namespace System.Reactive.Concurrency
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
 
-            var state1 = state;
-            var gate = new AsyncLock();
+            return new PeriodicallyScheduledWorkItem<TState>(state, period, action);
+        }
 
-            var res = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
-                tpt =>
-                {
-                    gate.Wait(() =>
-                    {
-                        state1 = action(state1);
-                    });
-                },
-                period
-            );
+        private sealed class PeriodicallyScheduledWorkItem<TState> : IDisposable
+        {
+            private TState _state;
+            private Func<TState, TState> _action;
+
+            private readonly ThreadPoolTimer _timer;
+            private readonly AsyncLock _gate = new AsyncLock();
 
-            return Disposable.Create(() =>
+            public PeriodicallyScheduledWorkItem(TState state, TimeSpan period, Func<TState, TState> action)
             {
-                res.Cancel();
-                gate.Dispose();
-                action = Stubs<TState>.I;
-            });
+                _state = state;
+                _action = action;
+
+                _timer = global::Windows.System.Threading.ThreadPoolTimer.CreatePeriodicTimer(
+                    Tick,
+                    period);
+            }
+
+            private void Tick(ThreadPoolTimer timer)
+            {
+                _gate.Wait(
+                    this,
+                    @this => @this._state = @this._action(@this._state));
+            }
+
+            public void Dispose()
+            {
+                _timer.Cancel();
+                _gate.Dispose();
+                _action = Stubs<TState>.I;
+            }
         }
     }
 }

+ 6 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/ThreadPoolScheduler.cs

@@ -284,10 +284,12 @@ namespace System.Reactive.Concurrency
 
             private void Tick(object state)
             {
-                _gate.Wait(() =>
-                {
-                    _state = _action(_state);
-                });
+                _gate.Wait(
+                    this,
+                    @this =>
+                    {
+                        @this._state = @this._action(@this._state);
+                    });
             }
 
             public void Dispose()

+ 9 - 12
Rx.NET/Source/src/System.Reactive/Internal/AsyncLockObserver.cs

@@ -19,26 +19,23 @@ namespace System.Reactive
 
         protected override void OnNextCore(T value)
         {
-            _gate.Wait(() =>
-            {
-                _observer.OnNext(value);
-            });
+            _gate.Wait(
+                (_observer, value),
+                tuple => tuple._observer.OnNext(tuple.value));
         }
 
         protected override void OnErrorCore(Exception exception)
         {
-            _gate.Wait(() =>
-            {
-                _observer.OnError(exception);
-            });
+            _gate.Wait(
+                (_observer, exception),
+                tuple => tuple._observer.OnError(tuple.exception));
         }
 
         protected override void OnCompletedCore()
         {
-            _gate.Wait(() =>
-            {
-                _observer.OnCompleted();
-            });
+            _gate.Wait(
+                _observer,
+                closureObserver => closureObserver.OnCompleted());
         }
     }
 }

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -520,7 +520,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     base.Run(source);
 
-                    _bufferGate.Wait(CreateBufferClose);
+                    _bufferGate.Wait(this, @this => @this.CreateBufferClose());
                 }
 
                 protected override void Dispose(bool disposing)
@@ -564,7 +564,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         ForwardOnNext(res);
                     }
 
-                    _bufferGate.Wait(CreateBufferClose);
+                    _bufferGate.Wait(this, @this => @this.CreateBufferClose());
                 }
 
                 private sealed class BufferClosingObserver : IObserver<TBufferClosing>

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs

@@ -530,7 +530,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     groupDisposable.Add(source.SubscribeSafe(this));
 
-                    _windowGate.Wait(CreateWindowClose);
+                    _windowGate.Wait(this, @this => @this.CreateWindowClose());
 
                     SetUpstream(_refCountDisposable);
                 }
@@ -569,7 +569,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         ForwardOnNext(window);
                     }
 
-                    _windowGate.Wait(CreateWindowClose);
+                    _windowGate.Wait(this, @this => @this.CreateWindowClose());
                 }
 
                 private sealed class WindowClosingObserver : IObserver<TWindowClosing>