浏览代码

Use Schedule calls with state (#558)

David Karnok 7 年之前
父节点
当前提交
196b1726a8

+ 4 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

@@ -56,9 +56,11 @@ namespace System.Reactive.Concurrency
                 var d = new SerialDisposable();
                 d.Disposable = m;
 
-                m.Disposable = scheduler.Schedule(() =>
+                m.Disposable = scheduler.Schedule((source, observer, d),
+                (scheduler, state) =>
                 {
-                    d.Disposable = new ScheduledDisposable(scheduler, source.SubscribeSafe(observer));
+                    state.d.Disposable = new ScheduledDisposable(scheduler, state.source.SubscribeSafe(state.observer));
+                    return Disposable.Empty;
                 });
 
                 return d;

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -434,7 +434,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         if (shouldWait)
                         {
                             var timer = new ManualResetEventSlim();
-                            _scheduler.Schedule(waitTime, () => { timer.Set(); });
+                            _scheduler.Schedule(timer, waitTime, (_, slimTimer) => { slimTimer.Set(); return Disposable.Empty; });
 
                             try
                             {

+ 2 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/FromEvent.cs

@@ -284,12 +284,13 @@ namespace System.Reactive.Linq.ObservableImpl
                         {
                             if (--_count == 0)
                             {
-                                _parent._scheduler.Schedule(_removeHandler.Dispose);
+                                _parent._scheduler.ScheduleAction(_removeHandler, handler => handler.Dispose());
                                 _parent._session = null;
                             }
                         }
                     });
                 }
+
             }
 
             private void Initialize()

+ 2 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Return.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -33,7 +34,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public IDisposable Run(IScheduler scheduler)
             {
-                return scheduler.Schedule(Invoke);
+                return scheduler.ScheduleAction(this, @this => @this.Invoke());
             }
 
             private void Invoke()

+ 3 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Skip.cs

@@ -101,14 +101,15 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public IDisposable Run(Time parent)
                 {
-                    var t = parent._scheduler.Schedule(parent._duration, Tick);
+                    var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
                     var d = parent._source.SubscribeSafe(this);
                     return StableCompositeDisposable.Create(t, d);
                 }
 
-                private void Tick()
+                private IDisposable Tick()
                 {
                     _open = true;
+                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)

+ 3 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -205,14 +205,15 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public IDisposable Run(SkipUntil<TSource> parent)
             {
-                var t = parent._scheduler.Schedule(parent._startTime, Tick);
+                var t = parent._scheduler.Schedule(this, parent._startTime, (_, state) => state.Tick());
                 var d = parent._source.SubscribeSafe(this);
                 return StableCompositeDisposable.Create(t, d);
             }
 
-            private void Tick()
+            private IDisposable Tick()
             {
                 _open = true;
+                return Disposable.Empty;
             }
 
             public override void OnNext(TSource value)

+ 3 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Take.cs

@@ -112,17 +112,18 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _gate = new object();
 
-                    var t = parent._scheduler.Schedule(parent._duration, Tick);
+                    var t = parent._scheduler.Schedule(this, parent._duration, (_, state) => state.Tick());
                     var d = parent._source.SubscribeSafe(this);
                     return StableCompositeDisposable.Create(t, d);
                 }
 
-                private void Tick()
+                private IDisposable Tick()
                 {
                     lock (_gate)
                     {
                         ForwardOnCompleted();
                     }
+                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)

+ 3 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -180,17 +180,18 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public IDisposable Run(TakeUntil<TSource> parent)
             {
-                var t = parent._scheduler.Schedule(parent._endTime, Tick);
+                var t = parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick());
                 var d = parent._source.SubscribeSafe(this);
                 return StableCompositeDisposable.Create(t, d);
             }
 
-            private void Tick()
+            private IDisposable Tick()
             {
                 lock (_gate)
                 {
                     ForwardOnCompleted();
                 }
+                return Disposable.Empty;
             }
 
             public override void OnNext(TSource value)

+ 2 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -33,13 +34,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public IDisposable Run(IScheduler scheduler)
             {
-                return scheduler.Schedule(Invoke);
+                return scheduler.ScheduleAction(this, @this => @this.ForwardOnError(@this._exception));
             }
 
-            private void Invoke()
-            {
-                ForwardOnError(_exception);
-            }
         }
     }
 }

+ 4 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -173,14 +173,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     _switched = false;
 
-                    var timer = parent._scheduler.Schedule(parent._dueTime, Timeout);
+                    var timer = parent._scheduler.Schedule(this, parent._dueTime, (_, state) => state.Timeout());
 
                     original.Disposable = parent._source.SubscribeSafe(this);
 
                     return StableCompositeDisposable.Create(_subscription, timer);
                 }
 
-                private void Timeout()
+                private IDisposable Timeout()
                 {
                     var timerWins = false;
 
@@ -192,6 +192,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (timerWins)
                         _subscription.Disposable = _other.SubscribeSafe(GetForwarder());
+
+                    return Disposable.Empty;
                 }
 
                 public override void OnNext(TSource value)

+ 4 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs

@@ -58,18 +58,19 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public IDisposable Run(Single parent, DateTimeOffset dueTime)
                 {
-                    return parent._scheduler.Schedule(dueTime, Invoke);
+                    return parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke());
                 }
 
                 public IDisposable Run(Single parent, TimeSpan dueTime)
                 {
-                    return parent._scheduler.Schedule(dueTime, Invoke);
+                    return parent._scheduler.Schedule(this, dueTime, (_, state) => state.Invoke());
                 }
 
-                private void Invoke()
+                private IDisposable Invoke()
                 {
                     ForwardOnNext(0);
                     ForwardOnCompleted();
+                    return Disposable.Empty;
                 }
             }
         }

+ 171 - 171
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs

@@ -904,20 +904,20 @@ namespace System.Reactive.Linq
             return () =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((function, subject), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function();
+                        result = state.function();
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -933,20 +933,20 @@ namespace System.Reactive.Linq
             return (first) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((function, subject, first), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first);
+                        result = state.function(state.first);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -962,20 +962,20 @@ namespace System.Reactive.Linq
             return (first, second) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second);
+                        result = state.function(state.first, state.second);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -991,20 +991,20 @@ namespace System.Reactive.Linq
             return (first, second, third) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third);
+                        result = state.function(state.first, state.second, state.third);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1020,20 +1020,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth);
+                        result = state.function(state.first, state.second, state.third, state.fourth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1049,20 +1049,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1078,20 +1078,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1107,20 +1107,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1136,20 +1136,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1165,20 +1165,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1194,20 +1194,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1223,20 +1223,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1252,20 +1252,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1281,20 +1281,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1310,20 +1310,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1339,20 +1339,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1368,20 +1368,20 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
             {
                 var subject = new AsyncSubject<TResult>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, function, first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth), state =>
                 {
                     var result = default(TResult);
                     try
                     {
-                        result = function(first, second, third, fourth, fifth, sixth, seventh, eight, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
+                        result = state.function(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eight, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth, state.sixteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(result);
-                    subject.OnCompleted();
+                    state.subject.OnNext(result);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1401,19 +1401,19 @@ namespace System.Reactive.Linq
             return () =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action), state =>
                 {
                     try
                     {
-                        action();
+                        state.action();
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
 
                 return subject.AsObservable();
@@ -1430,19 +1430,19 @@ namespace System.Reactive.Linq
             return (first) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first), state =>
                 {
                     try
                     {
-                        action(first);
+                        state.action(state.first);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1458,19 +1458,19 @@ namespace System.Reactive.Linq
             return (first, second) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second), state =>
                 {
                     try
                     {
-                        action(first, second);
+                        state.action(state.first, state.second);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1486,19 +1486,19 @@ namespace System.Reactive.Linq
             return (first, second, third) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third), state =>
                 {
                     try
                     {
-                        action(first, second, third);
+                        state.action(state.first, state.second, state.third);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1514,19 +1514,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth);
+                        state.action(state.first, state.second, state.third, state.fourth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1542,19 +1542,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1570,19 +1570,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1598,19 +1598,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1623,22 +1623,22 @@ namespace System.Reactive.Linq
 
         public virtual Func<T1, T2, T3, T4, T5, T6, T7, T8, IObservable<Unit>> ToAsync<T1, T2, T3, T4, T5, T6, T7, T8>(Action<T1, T2, T3, T4, T5, T6, T7, T8> action, IScheduler scheduler)
         {
-            return (first, second, third, fourth, fifth, sixth, seventh, eight) =>
+            return (first, second, third, fourth, fifth, sixth, seventh, eighth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eight);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1654,19 +1654,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1682,19 +1682,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1710,19 +1710,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1738,19 +1738,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1766,19 +1766,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1794,19 +1794,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1822,19 +1822,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };
@@ -1850,19 +1850,19 @@ namespace System.Reactive.Linq
             return (first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth) =>
             {
                 var subject = new AsyncSubject<Unit>();
-                scheduler.Schedule(() =>
+                scheduler.ScheduleAction((subject, action, first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth), state =>
                 {
                     try
                     {
-                        action(first, second, third, fourth, fifth, sixth, seventh, eighth, ninth, tenth, eleventh, twelfth, thirteenth, fourteenth, fifteenth, sixteenth);
+                        state.action(state.first, state.second, state.third, state.fourth, state.fifth, state.sixth, state.seventh, state.eighth, state.ninth, state.tenth, state.eleventh, state.twelfth, state.thirteenth, state.fourteenth, state.fifteenth, state.sixteenth);
                     }
                     catch (Exception exception)
                     {
-                        subject.OnError(exception);
+                        state.subject.OnError(exception);
                         return;
                     }
-                    subject.OnNext(Unit.Default);
-                    subject.OnCompleted();
+                    state.subject.OnNext(Unit.Default);
+                    state.subject.OnCompleted();
                 });
                 return subject.AsObservable();
             };

+ 4 - 3
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguageEx.cs

@@ -472,10 +472,11 @@ namespace System.Reactive.Linq
             public IDisposable Subscribe(IObserver<T> observer)
             {
                 var g = new CompositeDisposable();
-                g.Add(CurrentThreadScheduler.Instance.Schedule(() =>
+                g.Add(CurrentThreadScheduler.Instance.ScheduleAction((observer, g, @this: this),
+                state =>
                 {
-                    observer.OnNext(head);
-                    g.Add(tail.Merge().Subscribe(observer));
+                    state.observer.OnNext(state.@this.head);
+                    state.g.Add(state.@this.tail.Merge().Subscribe(state.observer));
                 }));
                 return g;
             }