Browse Source

In preparation for https://github.com/dotnet/reactive/pull/598, we evaluate where to replace IScheduler.Schedule by the ScheduleAction extension which does not allow recursive scheduling.

Daniel Weber 7 years ago
parent
commit
33c58b1424

+ 3 - 8
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Recursive.cs

@@ -205,11 +205,10 @@ namespace System.Reactive.Concurrency
             {
                 var sad = new SingleAssignmentDisposable();
                 group.Add(sad);
-                sad.Disposable = scheduler.Schedule((state, sad, @this: this), (_, nextState) =>
+                sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), nextState =>
                 {
                     [email protected](nextState.sad);
                     [email protected](nextState.state);
-                    return Disposable.Empty;
                 });
             }
 
@@ -234,11 +233,9 @@ namespace System.Reactive.Concurrency
             {
                 var sad = new SingleAssignmentDisposable();
                 group.Add(sad);
-                sad.Disposable = scheduler.Schedule((state, sad, @this: this), time, (_, nextState) =>
-                {
+                sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), time, nextState => {
                     [email protected](nextState.sad);
                     [email protected](nextState.state);
-                    return Disposable.Empty;
                 });
             }
 
@@ -263,11 +260,9 @@ namespace System.Reactive.Concurrency
             {
                 var sad = new SingleAssignmentDisposable();
                 group.Add(sad);
-                sad.Disposable = scheduler.Schedule((state, sad, @this: this), dtOffset, (_, nextState) =>
-                {
+                sad.Disposable = scheduler.ScheduleAction((state, sad, @this: this), dtOffset, nextState => {
                     [email protected](nextState.sad);
                     [email protected](nextState.state);
-                    return Disposable.Empty;
                 });
             }
 

+ 4 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

@@ -89,12 +89,12 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     var disp = _append
                         ? _source.SubscribeSafe(this)
-                        : _scheduler.Schedule(this, PrependValue);
+                        : _scheduler.ScheduleAction(this, PrependValue);
 
                     SetUpstream(disp);
                 }
 
-                private static IDisposable PrependValue(IScheduler scheduler, _ sink)
+                private static IDisposable PrependValue(_ sink)
                 {
                     sink.ForwardOnNext(sink._value);
                     return sink._source.SubscribeSafe(sink);
@@ -104,7 +104,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (_append)
                     {
-                        var disposable = _scheduler.Schedule(this, AppendValue);
+                        var disposable = _scheduler.ScheduleAction(this, AppendValue);
                         Disposable.TrySetSingle(ref _schedulerDisposable, disposable);
                     }
                     else
@@ -113,11 +113,10 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                 }
 
-                private static IDisposable AppendValue(IScheduler scheduler, _ sink)
+                private static void AppendValue(_ sink)
                 {
                     sink.ForwardOnNext(sink._value);
                     sink.ForwardOnCompleted();
-                    return Disposable.Empty;
                 }
 
                 protected override void Dispose(bool disposing)

+ 5 - 11
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -346,10 +346,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         _nextShift += _timeShift;
                     }
 
-                    m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => [email protected](tuple.isSpan, tuple.isShift));
+                    m.Disposable = _scheduler.ScheduleAction((@this: this, isSpan, isShift), ts, tuple => [email protected](tuple.isSpan, tuple.isShift));
                 }
 
-                private IDisposable Tick(bool isSpan, bool isShift)
+                private void Tick(bool isSpan, bool isShift)
                 {
                     lock (_gate)
                     {
@@ -372,8 +372,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     CreateTimer();
-
-                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)
@@ -559,19 +557,17 @@ namespace System.Reactive.Linq.ObservableImpl
                     var m = new SingleAssignmentDisposable();
                     Disposable.TrySetSerial(ref _timerSerial, m);
 
-                    m.Disposable = _parent._scheduler.Schedule((@this: this, id), _parent._timeSpan, (_, tuple) => [email protected](tuple.id));
+                    m.Disposable = _parent._scheduler.ScheduleAction((@this: this, id), _parent._timeSpan, tuple => [email protected](tuple.id));
                 }
 
-                private IDisposable Tick(int id)
+                private void Tick(int id)
                 {
-                    var d = Disposable.Empty;
-
                     var newId = 0;
                     lock (_gate)
                     {
                         if (id != _windowId)
                         {
-                            return d;
+                            return;
                         }
 
                         _n = 0;
@@ -583,8 +579,6 @@ namespace System.Reactive.Linq.ObservableImpl
 
                         CreateTimer(newId);
                     }
-
-                    return d;
                 }
 
                 public override void OnNext(TSource value)

+ 5 - 9
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -396,7 +396,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         if (shouldWait)
                         {
                             var timer = new ManualResetEventSlim();
-                            _scheduler.Schedule(timer, waitTime, (_, slimTimer) => { slimTimer.Set(); return Disposable.Empty; });
+                            _scheduler.ScheduleAction(timer, waitTime, slimTimer => { slimTimer.Set(); });
 
                             try
                             {
@@ -455,10 +455,10 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _ready = false;
 
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
                 }
 
-                private IDisposable Start()
+                private void Start()
                 {
                     var next = default(TimeSpan);
                     var shouldRun = false;
@@ -491,8 +491,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         Disposable.TrySetSerial(ref _cancelable, _scheduler.Schedule((Base<Absolute>.S)this, next, (@this, a) => DrainQueue(a)));
                     }
-
-                    return Disposable.Empty;
                 }
             }
 
@@ -508,10 +506,10 @@ namespace System.Reactive.Linq.ObservableImpl
                     // ScheduleDrain might have already set a newer disposable
                     // using TrySetSerial would cancel it, stopping the emission
                     // and hang the consumer
-                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Start()));
+                    Disposable.TrySetSingle(ref _cancelable, parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Start()));
                 }
 
-                private IDisposable Start()
+                private void Start()
                 {
                     lock (_gate)
                     {
@@ -528,8 +526,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     ScheduleDrain();
-
-                    return Disposable.Empty;
                 }
             }
         }

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs

@@ -56,12 +56,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void Run(IObservable<TSource> source, IScheduler scheduler, DateTimeOffset dueTime)
             {
-                SetUpstream(scheduler.Schedule((@this: this, source), dueTime, (self, tuple) => tuple.source.SubscribeSafe(tuple.@this)));
+                SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this)));
             }
 
             public void Run(IObservable<TSource> source, IScheduler scheduler, TimeSpan dueTime)
             {
-                SetUpstream(scheduler.Schedule((@this: this, source), dueTime, (self, tuple) => tuple.source.SubscribeSafe(tuple.@this)));
+                SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this)));
             }
         }
     }

+ 1 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/Empty.cs

@@ -29,11 +29,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void Run(IScheduler scheduler)
             {
-                SetUpstream(scheduler.Schedule(this, (s, target) =>
-                {
-                    target.OnCompleted();
-                    return Disposable.Empty;
-                }));
+                SetUpstream(scheduler.ScheduleAction(this, target => target.OnCompleted()));
             }
         }
     }

+ 3 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs

@@ -321,11 +321,11 @@ namespace System.Reactive.Linq.ObservableImpl
                     // the GetSchedulerForCurrentContext method).
                     //
                     var onNext = _parent.GetHandler(_subject.OnNext);
-                    _parent._scheduler.Schedule(onNext, AddHandler);
+                    _parent._scheduler.ScheduleAction(onNext, AddHandler);
                 }
             }
 
-            private IDisposable AddHandler(IScheduler self, TDelegate onNext)
+            private void AddHandler(TDelegate onNext)
             {
                 var removeHandler = default(IDisposable);
                 try
@@ -335,7 +335,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 catch (Exception exception)
                 {
                     _subject.OnError(exception);
-                    return Disposable.Empty;
+                    return;
                 }
 
                 //
@@ -347,8 +347,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 // remove handler to run on the scheduler.
                 //
                 _removeHandler.Disposable = removeHandler;
-
-                return Disposable.Empty;
             }
         }
     }

+ 1 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -140,7 +140,7 @@ namespace System.Reactive.Linq.ObservableImpl
                                 {
                                     var cancelable = (SingleAssignmentDisposable)Volatile.Read(ref closureParent._serial);
 
-                                    cancelable.Disposable = closureParent._scheduler.Schedule((cancelable, closureParent), closureParent._disconnectTime, (self, tuple2) =>
+                                    cancelable.Disposable = closureParent._scheduler.ScheduleAction((cancelable, closureParent), closureParent._disconnectTime, tuple2 =>
                                     {
                                         lock (tuple2.closureParent._gate)
                                         {
@@ -150,8 +150,6 @@ namespace System.Reactive.Linq.ObservableImpl
                                                 tuple2.closureParent._connectableSubscription = null;
                                             }
                                         }
-
-                                        return Disposable.Empty;
                                     });
                                 }
                             }

+ 2 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs

@@ -111,7 +111,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public void Run(Time parent)
                 {
-                    SetUpstream(parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick()));
+                    SetUpstream(parent._scheduler.ScheduleAction(this, parent._duration, state => state.Tick()));
                     Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
                 }
 
@@ -124,10 +124,9 @@ namespace System.Reactive.Linq.ObservableImpl
                     base.Dispose(disposing);
                 }
 
-                private IDisposable Tick()
+                private void Tick()
                 {
                     _open = true;
-                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)

+ 2 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -170,7 +170,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void Run(SkipUntil<TSource> parent)
             {
-                Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick()));
+                Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._startTime, state => state.Tick()));
                 base.Run(parent._source);
             }
 
@@ -183,10 +183,9 @@ namespace System.Reactive.Linq.ObservableImpl
                 base.Dispose(disposing);
             }
 
-            private IDisposable Tick()
+            private void Tick()
             {
                 _open = true;
-                return Disposable.Empty;
             }
 
             public override void OnNext(TSource value)

+ 2 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs

@@ -122,7 +122,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _gate = new object();
 
-                    Disposable.SetSingle(ref _task, parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick()));
+                    Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._duration, state => state.Tick()));
                     base.Run(parent._source);
                 }
 
@@ -135,13 +135,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     base.Dispose(disposing);
                 }
 
-                private IDisposable Tick()
+                private void Tick()
                 {
                     lock (_gate)
                     {
                         ForwardOnCompleted();
                     }
-                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)

+ 2 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -147,7 +147,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void Run(TakeUntil<TSource> parent)
             {
-                Disposable.SetSingle(ref _timerDisposable, parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
+                Disposable.SetSingle(ref _timerDisposable, parent._scheduler.ScheduleAction(this, parent._endTime, state => state.Tick()));
                 base.Run(parent._source);
             }
 
@@ -160,10 +160,9 @@ namespace System.Reactive.Linq.ObservableImpl
                 base.Dispose(disposing);
             }
 
-            private IDisposable Tick()
+            private void Tick()
             {
                 OnCompleted();
-                return Disposable.Empty;
             }
 
             public override void OnNext(TSource value)

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throttle.cs

@@ -64,10 +64,10 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
 
                 Disposable.TrySetSerial(ref _serialCancelable, null);
-                Disposable.TrySetSerial(ref _serialCancelable, _scheduler.Schedule((@this: this, currentid), _dueTime, (_, tuple) => [email protected](tuple.currentid)));
+                Disposable.TrySetSerial(ref _serialCancelable, _scheduler.ScheduleAction((@this: this, currentid), _dueTime, tuple => [email protected](tuple.currentid)));
             }
 
-            private IDisposable Propagate(ulong currentid)
+            private void Propagate(ulong currentid)
             {
                 lock (_gate)
                 {
@@ -78,8 +78,6 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     _hasValue = false;
                 }
-
-                return Disposable.Empty;
             }
 
             public override void OnError(Exception error)

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -70,7 +70,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     if (Disposable.TrySetMultiple(ref _timerDisposable, null))
                     {
 
-                        var d = _scheduler.Schedule((idx, instance: this), _dueTime, (_, state) => { state.instance.Timeout(state.idx); return Disposable.Empty; });
+                        var d = _scheduler.ScheduleAction((idx, instance: this), _dueTime, state => { state.instance.Timeout(state.idx); });
 
                         Disposable.TrySetMultiple(ref _timerDisposable, d);
                     }
@@ -159,7 +159,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public void Run(Absolute parent)
                 {
-                    SetUpstream(parent._scheduler.Schedule(this, parent._dueTime, (_, @this) => @this.Timeout()));
+                    SetUpstream(parent._scheduler.ScheduleAction(this, parent._dueTime, @this => @this.Timeout()));
 
                     Disposable.TrySetSingle(ref _serialDisposable, parent._source.SubscribeSafe(this));
                 }
@@ -173,13 +173,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     base.Dispose(disposing);
                 }
 
-                private IDisposable Timeout()
+                private void Timeout()
                 {
                     if (Interlocked.Increment(ref _wip) == 1)
                     {
                         Disposable.TrySetSerial(ref _serialDisposable, _other.SubscribeSafe(GetForwarder()));
                     }
-                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs

@@ -58,19 +58,18 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public void Run(Single parent, DateTimeOffset dueTime)
                 {
-                    SetUpstream(parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke()));
+                    SetUpstream(parent._scheduler.ScheduleAction(this, dueTime, state => state.Invoke()));
                 }
 
                 public void Run(Single parent, TimeSpan dueTime)
                 {
-                    SetUpstream(parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke()));
+                    SetUpstream(parent._scheduler.ScheduleAction(this, dueTime, state => state.Invoke()));
                 }
 
-                private IDisposable Invoke()
+                private void Invoke()
                 {
                     ForwardOnNext(0);
                     ForwardOnCompleted();
-                    return Disposable.Empty;
                 }
             }
         }

+ 5 - 11
Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs

@@ -208,10 +208,10 @@ namespace System.Reactive.Linq.ObservableImpl
                         _nextShift += _timeShift;
                     }
 
-                    m.Disposable = _scheduler.Schedule((@this: this, isSpan, isShift), ts, (_, tuple) => [email protected](tuple.isSpan, tuple.isShift));
+                    m.Disposable = _scheduler.ScheduleAction((@this: this, isSpan, isShift), ts, tuple => [email protected](tuple.isSpan, tuple.isShift));
                 }
 
-                private IDisposable Tick(bool isSpan, bool isShift)
+                private void Tick(bool isSpan, bool isShift)
                 {
                     lock (_gate)
                     {
@@ -234,8 +234,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     CreateTimer();
-
-                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)
@@ -424,19 +422,17 @@ namespace System.Reactive.Linq.ObservableImpl
                     var m = new SingleAssignmentDisposable();
                     _timerD.Disposable = m;
 
-                    m.Disposable = _scheduler.Schedule((@this: this, window), _timeSpan, (_, tuple) => [email protected](tuple.window));
+                    m.Disposable = _scheduler.ScheduleAction((@this: this, window), _timeSpan, tuple => [email protected](tuple.window));
                 }
 
-                private IDisposable Tick(Subject<TSource> window)
+                private void Tick(Subject<TSource> window)
                 {
-                    var d = Disposable.Empty;
-
                     var newWindow = default(Subject<TSource>);
                     lock (_gate)
                     {
                         if (window != _s)
                         {
-                            return d;
+                            return;
                         }
 
                         _n = 0;
@@ -448,8 +444,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
 
                     CreateTimer(newWindow);
-
-                    return d;
                 }
 
                 public override void OnNext(TSource value)

+ 1 - 2
Rx.NET/Source/src/System.Reactive/Notification.cs

@@ -665,7 +665,7 @@ namespace System.Reactive
 
             protected override IDisposable SubscribeCore(IObserver<T> observer)
             {
-                return _scheduler.Schedule((_parent, observer), (scheduler, state) =>
+                return _scheduler.ScheduleAction((_parent, observer), state =>
                 {
                     var parent = state._parent;
                     var o = state.observer;
@@ -676,7 +676,6 @@ namespace System.Reactive
                     {
                         o.OnCompleted();
                     }
-                    return Disposable.Empty;
                 });
             }
         }

+ 2 - 4
Rx.NET/Source/src/System.Reactive/ObservableBase.cs

@@ -50,7 +50,7 @@ namespace System.Reactive
                 // exception thrown in OnNext to circle back to OnError, which looks like the
                 // sequence can't make up its mind.
                 //
-                CurrentThreadScheduler.Instance.Schedule(autoDetachObserver, ScheduledSubscribe);
+                CurrentThreadScheduler.Instance.ScheduleAction(autoDetachObserver, ScheduledSubscribe);
             }
             else
             {
@@ -79,7 +79,7 @@ namespace System.Reactive
             return autoDetachObserver;
         }
 
-        private IDisposable ScheduledSubscribe(IScheduler _, AutoDetachObserver<T> autoDetachObserver)
+        private void ScheduledSubscribe(AutoDetachObserver<T> autoDetachObserver)
         {
             try
             {
@@ -101,8 +101,6 @@ namespace System.Reactive
                     throw;
                 }
             }
-
-            return Disposable.Empty;
         }
 
         /// <summary>