|
|
@@ -102,27 +102,49 @@ namespace System.Reactive.Concurrency
|
|
|
|
|
|
sealed class SubscribeOnCtxObservable<TSource> : ObservableBase<TSource>
|
|
|
{
|
|
|
- readonly IObservable<TSource> source;
|
|
|
+ private sealed class Subscription : IDisposable
|
|
|
+ {
|
|
|
+ private readonly IObservable<TSource> _source;
|
|
|
+ private readonly IObserver<TSource> _observer;
|
|
|
+ private readonly SynchronizationContext _context;
|
|
|
|
|
|
- readonly SynchronizationContext context;
|
|
|
+ private IDisposable _cancel;
|
|
|
+
|
|
|
+ public Subscription(IObservable<TSource> source, SynchronizationContext context, IObserver<TSource> observer)
|
|
|
+ {
|
|
|
+ _source = source;
|
|
|
+ _context = context;
|
|
|
+ _observer = observer;
|
|
|
+
|
|
|
+ context.PostWithStartComplete(
|
|
|
+ @this =>
|
|
|
+ {
|
|
|
+ if (!Disposable.GetIsDisposed(ref @this._cancel))
|
|
|
+ {
|
|
|
+ Disposable.SetSingle(ref @this._cancel, new ContextDisposable(@this._context, @this._source.SubscribeSafe(@this._observer)));
|
|
|
+ }
|
|
|
+ },
|
|
|
+ this);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void Dispose()
|
|
|
+ {
|
|
|
+ Disposable.TryDispose(ref _cancel);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ readonly IObservable<TSource> _source;
|
|
|
+ readonly SynchronizationContext _context;
|
|
|
|
|
|
public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context)
|
|
|
{
|
|
|
- this.source = source;
|
|
|
- this.context = context;
|
|
|
+ this._source = source;
|
|
|
+ this._context = context;
|
|
|
}
|
|
|
|
|
|
protected override IDisposable SubscribeCore(IObserver<TSource> observer)
|
|
|
{
|
|
|
- var subscription = new SingleAssignmentDisposable();
|
|
|
- context.PostWithStartComplete(() =>
|
|
|
- {
|
|
|
- if (!subscription.IsDisposed)
|
|
|
- {
|
|
|
- subscription.Disposable = new ContextDisposable(context, source.SubscribeSafe(observer));
|
|
|
- }
|
|
|
- });
|
|
|
- return subscription;
|
|
|
+ return new Subscription(_source, _context, observer);
|
|
|
}
|
|
|
}
|
|
|
|