Browse Source

4.x: Deanonymize operators (#549)

David Karnok 7 years ago
parent
commit
088fc52a0f

+ 34 - 4
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

@@ -35,7 +35,22 @@ namespace System.Reactive.Concurrency
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return new AnonymousObservable<TSource>(observer =>
+            return new SubscribeOnObservable<TSource>(source, scheduler);
+        }
+
+        sealed class SubscribeOnObservable<TSource> : ObservableBase<TSource>
+        {
+            readonly IObservable<TSource> source;
+
+            readonly IScheduler scheduler;
+
+            public SubscribeOnObservable(IObservable<TSource> source, IScheduler scheduler)
+            {
+                this.source = source;
+                this.scheduler = scheduler;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TSource> observer)
             {
                 var m = new SingleAssignmentDisposable();
                 var d = new SerialDisposable();
@@ -47,7 +62,7 @@ namespace System.Reactive.Concurrency
                 });
 
                 return d;
-            });
+            }
         }
 
         /// <summary>
@@ -69,7 +84,22 @@ namespace System.Reactive.Concurrency
             if (context == null)
                 throw new ArgumentNullException(nameof(context));
 
-            return new AnonymousObservable<TSource>(observer =>
+            return new SubscribeOnCtxObservable<TSource>(source, context);
+        }
+
+        sealed class SubscribeOnCtxObservable<TSource> : ObservableBase<TSource>
+        {
+            readonly IObservable<TSource> source;
+
+            readonly SynchronizationContext context;
+
+            public SubscribeOnCtxObservable(IObservable<TSource> source, SynchronizationContext context)
+            {
+                this.source = source;
+                this.context = context;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TSource> observer)
             {
                 var subscription = new SingleAssignmentDisposable();
                 context.PostWithStartComplete(() =>
@@ -80,7 +110,7 @@ namespace System.Reactive.Concurrency
                     }
                 });
                 return subscription;
-            });
+            }
         }
 
         #endregion

+ 18 - 10
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Async.cs

@@ -725,14 +725,29 @@ namespace System.Reactive.Linq
                 result = task.ToObservable();
             }
 
-            return new AnonymousObservable<TSource>(observer =>
+            return new StartAsyncObservable<TSource>(cancellable, result);
+        }
+
+        sealed class StartAsyncObservable<TSource> : ObservableBase<TSource>
+        {
+            readonly CancellationDisposable cancellable;
+
+            readonly IObservable<TSource> result;
+
+            public StartAsyncObservable(CancellationDisposable cancellable, IObservable<TSource> result)
+            {
+                this.cancellable = cancellable;
+                this.result = result;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TSource> observer)
             {
                 //
                 // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
                 //
                 var subscription = result.Subscribe/*Unsafe*/(observer);
                 return StableCompositeDisposable.Create(cancellable, subscription);
-            });
+            }
         }
 
         #endregion
@@ -816,14 +831,7 @@ namespace System.Reactive.Linq
                 result = task.ToObservable();
             }
 
-            return new AnonymousObservable<Unit>(observer =>
-            {
-                //
-                // [OK] Use of unsafe Subscribe: result is an AsyncSubject<TSource>.
-                //
-                var subscription = result.Subscribe/*Unsafe*/(observer);
-                return StableCompositeDisposable.Create(cancellable, subscription);
-            });
+            return new StartAsyncObservable<Unit>(cancellable, result);
         }
 
         #endregion

+ 178 - 18
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -20,16 +20,43 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
         {
-            return new AnonymousObservable<TSource>(subscribe);
+            return new CreateWithDisposableObservable<TSource>(subscribe);
+        }
+
+        sealed class CreateWithDisposableObservable<TSource> : ObservableBase<TSource>
+        {
+            readonly Func<IObserver<TSource>, IDisposable> subscribe;
+
+            public CreateWithDisposableObservable(Func<IObserver<TSource>, IDisposable> subscribe)
+            {
+                this.subscribe = subscribe;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TSource> observer)
+            {
+                return subscribe(observer) ?? Disposable.Empty;
+            }
         }
 
         public virtual IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, Action> subscribe)
         {
-            return new AnonymousObservable<TSource>(o =>
+            return new CreateWithActionDisposable<TSource>(subscribe);
+        }
+
+        sealed class CreateWithActionDisposable<TSource> : ObservableBase<TSource>
+        {
+            readonly Func<IObserver<TSource>, Action> subscribe;
+
+            public CreateWithActionDisposable(Func<IObserver<TSource>, Action> subscribe)
             {
-                var a = subscribe(o);
+                this.subscribe = subscribe;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TSource> observer)
+            {
+                var a = subscribe(observer);
                 return a != null ? Disposable.Create(a) : Disposable.Empty;
-            });
+            }
         }
 
         #endregion
@@ -38,16 +65,53 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
         {
-            return new AnonymousObservable<TResult>(observer =>
+            return new CreateWithTaskTokenObservable<TResult>(subscribeAsync);
+        }
+
+        sealed class CreateWithTaskTokenObservable<TResult> : ObservableBase<TResult>
+        {
+            readonly Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync;
+
+            public CreateWithTaskTokenObservable(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync)
+            {
+                this.subscribeAsync = subscribeAsync;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
             {
                 var cancellable = new CancellationDisposable();
 
                 var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
-                var taskCompletionObserver = new AnonymousObserver<Unit>(Stubs<Unit>.Ignore, observer.OnError, observer.OnCompleted);
+                var taskCompletionObserver = new TaskCompletionObserver(observer);
                 var subscription = taskObservable.Subscribe(taskCompletionObserver);
 
                 return StableCompositeDisposable.Create(cancellable, subscription);
-            });
+            }
+
+            sealed class TaskCompletionObserver : IObserver<Unit>
+            {
+                readonly IObserver<TResult> observer;
+
+                public TaskCompletionObserver(IObserver<TResult> observer)
+                {
+                    this.observer = observer;
+                }
+
+                public void OnCompleted()
+                {
+                    observer.OnCompleted();
+                }
+
+                public void OnError(Exception error)
+                {
+                    observer.OnError(error);
+                }
+
+                public void OnNext(Unit value)
+                {
+                    // deliberately ignored
+                }
+            }
         }
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task> subscribeAsync)
@@ -57,13 +121,25 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
         {
-            return new AnonymousObservable<TResult>(observer =>
+            return new CreateWithTaskDisposable<TResult>(subscribeAsync);
+        }
+
+        sealed class CreateWithTaskDisposable<TResult> : ObservableBase<TResult>
+        {
+            readonly Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync;
+
+            public CreateWithTaskDisposable(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync)
+            {
+                this.subscribeAsync = subscribeAsync;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
             {
-                var subscription = new SingleAssignmentDisposable();
                 var cancellable = new CancellationDisposable();
 
                 var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
-                var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);
+
+                var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
 
                 //
                 // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
@@ -71,8 +147,43 @@ namespace System.Reactive.Linq
                 //
                 taskObservable.Subscribe(taskCompletionObserver);
 
-                return StableCompositeDisposable.Create(cancellable, subscription);
-            });
+                return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
+            }
+
+            sealed class TaskDisposeCompletionObserver : IObserver<IDisposable>, IDisposable
+            {
+                readonly IObserver<TResult> observer;
+
+                IDisposable disposable;
+
+                public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                {
+                    this.observer = observer;
+                }
+
+                public void Dispose()
+                {
+                    Interlocked.Exchange(ref disposable, BooleanDisposable.True)?.Dispose();
+                }
+
+                public void OnCompleted()
+                {
+                    observer.OnCompleted();
+                }
+
+                public void OnError(Exception error)
+                {
+                    observer.OnError(error);
+                }
+
+                public void OnNext(IDisposable value)
+                {
+                    if (Interlocked.CompareExchange(ref disposable, value, null) != null)
+                    {
+                        value?.Dispose();
+                    }
+                }
+            }
         }
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<IDisposable>> subscribeAsync)
@@ -82,22 +193,71 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
         {
-            return new AnonymousObservable<TResult>(observer =>
+            return new CreateWithTaskActionObservable<TResult>(subscribeAsync);
+        }
+
+        sealed class CreateWithTaskActionObservable<TResult> : ObservableBase<TResult>
+        {
+            readonly Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync;
+
+            public CreateWithTaskActionObservable(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync)
+            {
+                this.subscribeAsync = subscribeAsync;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
             {
-                var subscription = new SingleAssignmentDisposable();
                 var cancellable = new CancellationDisposable();
 
                 var taskObservable = subscribeAsync(observer, cancellable.Token).ToObservable();
-                var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);
+
+                var taskCompletionObserver = new TaskDisposeCompletionObserver(observer);
 
                 //
-                // We don't cancel the subscription below *ever* and want to make sure the returned resource eventually gets disposed.
+                // We don't cancel the subscription below *ever* and want to make sure the returned resource gets disposed eventually.
                 // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
                 //
                 taskObservable.Subscribe(taskCompletionObserver);
 
-                return StableCompositeDisposable.Create(cancellable, subscription);
-            });
+                return StableCompositeDisposable.Create(cancellable, taskCompletionObserver);
+            }
+
+            sealed class TaskDisposeCompletionObserver : IObserver<Action>, IDisposable
+            {
+                readonly IObserver<TResult> observer;
+
+                Action disposable;
+
+                static readonly Action DisposedAction = () => { };
+
+                public TaskDisposeCompletionObserver(IObserver<TResult> observer)
+                {
+                    this.observer = observer;
+                }
+
+                public void Dispose()
+                {
+                    Interlocked.Exchange(ref disposable, DisposedAction)?.Invoke();
+                }
+
+                public void OnCompleted()
+                {
+                    observer.OnCompleted();
+                }
+
+                public void OnError(Exception error)
+                {
+                    observer.OnError(error);
+                }
+
+                public void OnNext(Action value)
+                {
+                    if (Interlocked.CompareExchange(ref disposable, value, null) != null)
+                    {
+                        value?.Invoke();
+                    }
+                }
+            }
         }
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, Task<Action>> subscribeAsync)

+ 57 - 19
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Joins.cs

@@ -37,38 +37,44 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> When<TResult>(IEnumerable<Plan<TResult>> plans)
         {
-            return new AnonymousObservable<TResult>(observer =>
+            return new WhenObservable<TResult>(plans);
+        }
+
+        sealed class WhenObservable<TResult> : ObservableBase<TResult>
+        {
+            readonly IEnumerable<Plan<TResult>> plans;
+
+            public WhenObservable(IEnumerable<Plan<TResult>> plans)
+            {
+                this.plans = plans;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
             {
                 var externalSubscriptions = new Dictionary<object, IJoinObserver>();
                 var gate = new object();
                 var activePlans = new List<ActivePlan>();
-                var outObserver = Observer.Create<TResult>(observer.OnNext,
-                    exception =>
-                    {
-                        foreach (var po in externalSubscriptions.Values)
-                        {
-                            po.Dispose();
-                        }
-                        observer.OnError(exception);
-                    },
-                    observer.OnCompleted);
+                var outObserver = new OutObserver(observer, externalSubscriptions);
                 try
                 {
+                    Action<ActivePlan> onDeactivate = activePlan =>
+                    {
+                        activePlans.Remove(activePlan);
+                        if (activePlans.Count == 0)
+                            outObserver.OnCompleted();
+                    };
+
                     foreach (var plan in plans)
                         activePlans.Add(plan.Activate(externalSubscriptions, outObserver,
-                                                      activePlan =>
-                                                      {
-                                                          activePlans.Remove(activePlan);
-                                                          if (activePlans.Count == 0)
-                                                              outObserver.OnCompleted();
-                                                      }));
+                                                      onDeactivate));
                 }
                 catch (Exception e)
                 {
                     //
                     // [OK] Use of unsafe Subscribe: we're calling into a known producer implementation.
                     //
-                    return Throw<TResult>(e).Subscribe/*Unsafe*/(observer);
+                    observer.OnError(e);
+                    return Disposable.Empty;
                 }
 
                 var group = new CompositeDisposable(externalSubscriptions.Values.Count);
@@ -78,7 +84,39 @@ namespace System.Reactive.Linq
                     group.Add(joinObserver);
                 }
                 return group;
-            });
+            }
+
+            sealed class OutObserver : IObserver<TResult>
+            {
+                readonly IObserver<TResult> observer;
+
+                readonly Dictionary<object, IJoinObserver> externalSubscriptions;
+
+                public OutObserver(IObserver<TResult> observer, Dictionary<object, IJoinObserver> externalSubscriptions)
+                {
+                    this.observer = observer;
+                    this.externalSubscriptions = externalSubscriptions;
+                }
+
+                public void OnCompleted()
+                {
+                    observer.OnCompleted();
+                }
+
+                public void OnError(Exception exception)
+                {
+                    foreach (var po in externalSubscriptions.Values)
+                    {
+                        po.Dispose();
+                    }
+                    observer.OnError(exception);
+                }
+
+                public void OnNext(TResult value)
+                {
+                    observer.OnNext(value);
+                }
+            }
         }
 
         #endregion

+ 118 - 15
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguageEx.cs

@@ -16,14 +16,71 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
         {
-            return new AnonymousObservable<TResult>(observer =>
-                iteratorMethod(observer).Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
+            return new CreateWithEnumerableObservable<TResult>(iteratorMethod);
+        }
+
+        sealed class CreateWithEnumerableObservable<TResult> : ObservableBase<TResult>
+        {
+            readonly Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod;
+
+            public CreateWithEnumerableObservable(Func<IObserver<TResult>, IEnumerable<IObservable<object>>> iteratorMethod)
+            {
+                this.iteratorMethod = iteratorMethod;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            {
+                return iteratorMethod(observer)
+                    .Concat()
+                    .Subscribe(new TerminalOnlyObserver<TResult>(observer));
+            }
+        }
+
+        sealed class TerminalOnlyObserver<TResult> : IObserver<object>
+        {
+            readonly IObserver<TResult> observer;
+
+            public TerminalOnlyObserver(IObserver<TResult> observer)
+            {
+                this.observer = observer;
+            }
+
+            public void OnCompleted()
+            {
+                observer.OnCompleted();
+            }
+
+            public void OnError(Exception error)
+            {
+                observer.OnError(error);
+            }
+
+            public void OnNext(object value)
+            {
+                // deliberately ignored
+            }
         }
 
         public virtual IObservable<Unit> Create(Func<IEnumerable<IObservable<object>>> iteratorMethod)
         {
-            return new AnonymousObservable<Unit>(observer =>
-                iteratorMethod().Concat().Subscribe(_ => { }, observer.OnError, observer.OnCompleted));
+            return new CreateWithOnlyEnumerableObservable<Unit>(iteratorMethod);
+        }
+
+        sealed class CreateWithOnlyEnumerableObservable<TResult> : ObservableBase<TResult>
+        {
+            readonly Func<IEnumerable<IObservable<object>>> iteratorMethod;
+
+            public CreateWithOnlyEnumerableObservable(Func<IEnumerable<IObservable<object>>> iteratorMethod)
+            {
+                this.iteratorMethod = iteratorMethod;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
+            {
+                return iteratorMethod()
+                    .Concat()
+                    .Subscribe(new TerminalOnlyObserver<TResult>(observer));
+            }
         }
 
         #endregion
@@ -32,7 +89,23 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler)
         {
-            return new AnonymousObservable<TSource>(observer =>
+            return new ExpandObservable<TSource>(source, selector, scheduler);
+        }
+
+        sealed class ExpandObservable<TSource> : ObservableBase<TSource>
+        {
+            readonly IObservable<TSource> source;
+            readonly Func<TSource, IObservable<TSource>> selector;
+            readonly IScheduler scheduler;
+
+            public ExpandObservable(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector, IScheduler scheduler)
+            {
+                this.source = source;
+                this.selector = selector;
+                this.scheduler = scheduler;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TSource> observer)
             {
                 var outGate = new object();
                 var q = new Queue<IObservable<TSource>>();
@@ -133,7 +206,7 @@ namespace System.Reactive.Linq
                 ensureActive();
 
                 return d;
-            });
+            }
         }
 
         public virtual IObservable<TSource> Expand<TSource>(IObservable<TSource> source, Func<TSource, IObservable<TSource>> selector)
@@ -245,14 +318,26 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<TSource[]> ForkJoin<TSource>(IEnumerable<IObservable<TSource>> sources)
         {
-            return new AnonymousObservable<TSource[]>(subscriber =>
+            return new ForkJoinObservable<TSource>(sources);
+        }
+
+        sealed class ForkJoinObservable<TSource> : ObservableBase<TSource[]>
+        {
+            readonly IEnumerable<IObservable<TSource>> sources;
+
+            public ForkJoinObservable(IEnumerable<IObservable<TSource>> sources)
+            {
+                this.sources = sources;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TSource[]> observer)
             {
                 var allSources = sources.ToArray();
                 var count = allSources.Length;
 
                 if (count == 0)
                 {
-                    subscriber.OnCompleted();
+                    observer.OnCompleted();
                     return Disposable.Empty;
                 }
 
@@ -288,7 +373,7 @@ namespace System.Reactive.Linq
                                 lock (gate)
                                 {
                                     finished = true;
-                                    subscriber.OnError(error);
+                                    observer.OnError(error);
                                     group.Dispose();
                                 }
                             },
@@ -300,7 +385,7 @@ namespace System.Reactive.Linq
                                     {
                                         if (!hasResults[currentIndex])
                                         {
-                                            subscriber.OnCompleted();
+                                            observer.OnCompleted();
                                             return;
                                         }
                                         hasCompleted[currentIndex] = true;
@@ -310,15 +395,15 @@ namespace System.Reactive.Linq
                                                 return;
                                         }
                                         finished = true;
-                                        subscriber.OnNext(results.ToArray());
-                                        subscriber.OnCompleted();
+                                        observer.OnNext(results.ToArray());
+                                        observer.OnCompleted();
                                     }
                                 }
                             }));
                     }
                 }
                 return group;
-            });
+            }
         }
 
         #endregion
@@ -427,7 +512,24 @@ namespace System.Reactive.Linq
 
         private static IObservable<TResult> Combine<TLeft, TRight, TResult>(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
         {
-            return new AnonymousObservable<TResult>(observer =>
+            return new CombineObservable<TLeft, TRight, TResult>(leftSource, rightSource, combinerSelector);
+        }
+
+        sealed class CombineObservable<TLeft, TRight, TResult> : ObservableBase<TResult>
+        {
+            readonly IObservable<TLeft> leftSource;
+            readonly IObservable<TRight> rightSource; 
+            
+            readonly Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector;
+
+            public CombineObservable(IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<IObserver<TResult>, IDisposable, IDisposable, IObserver<Either<Notification<TLeft>, Notification<TRight>>>> combinerSelector)
+            {
+                this.leftSource = leftSource;
+                this.rightSource = rightSource;
+                this.combinerSelector = combinerSelector;
+            }
+
+            protected override IDisposable SubscribeCore(IObserver<TResult> observer)
             {
                 var leftSubscription = new SingleAssignmentDisposable();
                 var rightSubscription = new SingleAssignmentDisposable();
@@ -439,7 +541,8 @@ namespace System.Reactive.Linq
                 rightSubscription.Disposable = rightSource.Materialize().Select(x => Either<Notification<TLeft>, Notification<TRight>>.CreateRight(x)).Synchronize(gate).Subscribe(combiner);
 
                 return StableCompositeDisposable.Create(leftSubscription, rightSubscription);
-            });
+
+            }
         }
 
         #endregion

+ 29 - 6
Rx.NET/Source/src/System.Reactive/Notification.cs

@@ -6,6 +6,7 @@ using System.Diagnostics;
 using System.Globalization;
 using System.Collections.Generic;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 
 #pragma warning disable 0659
 #pragma warning disable 0661
@@ -555,15 +556,37 @@ namespace System.Reactive
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return new AnonymousObservable<T>(observer => scheduler.Schedule(() =>
+            return new NotificationToObservable(scheduler, this);
+        }
+
+        sealed class NotificationToObservable : ObservableBase<T>
+        {
+            readonly IScheduler scheduler;
+
+            readonly Notification<T> parent;
+
+            public NotificationToObservable(IScheduler scheduler, Notification<T> parent)
             {
-                Accept(observer);
+                this.scheduler = scheduler;
+                this.parent = parent;
+            }
 
-                if (Kind == NotificationKind.OnNext)
+            protected override IDisposable SubscribeCore(IObserver<T> observer)
+            {
+                return scheduler.Schedule((parent, observer), (scheduler, state) =>
                 {
-                    observer.OnCompleted();
-                }
-            }));
+                    var parent = state.parent;
+                    var o = state.observer;
+
+                    parent.Accept(o);
+
+                    if (parent.Kind == NotificationKind.OnNext)
+                    {
+                        o.OnCompleted();
+                    }
+                    return Disposable.Empty;
+                });
+            }
         }
     }