Переглянути джерело

Further optimizations to ObserveOn,

Bart De Smet 8 роки тому
батько
коміт
fc8e754af8

+ 73 - 69
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.ObserveOn.cs

@@ -7,34 +7,21 @@ using System.Threading;
 
 namespace System.Reactive.Concurrency
 {
-    internal sealed class ObserveOn<TSource> : Producer<TSource>
+    internal static class ObserveOn<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly IScheduler _scheduler;
-        private readonly SynchronizationContext _context;
-
-        public ObserveOn(IObservable<TSource> source, IScheduler scheduler)
-        {
-            _source = source;
-            _scheduler = scheduler;
-        }
-
-        public ObserveOn(IObservable<TSource> source, SynchronizationContext context)
+        internal sealed class Scheduler : Producer<TSource>
         {
-            _source = source;
-            _context = context;
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly IScheduler _scheduler;
 
-        [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_context != null)
+            public Scheduler(IObservable<TSource> source, IScheduler scheduler)
             {
-                var sink = new ObserveOnSink(this, observer, cancel);
-                setSink(sink);
-                return sink.Run();
+                _source = source;
+                _scheduler = scheduler;
             }
-            else
+
+            [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new ObserveOnObserver<TSource>(_scheduler, observer, cancel);
                 setSink(sink);
@@ -42,66 +29,83 @@ namespace System.Reactive.Concurrency
             }
         }
 
-        private sealed class ObserveOnSink : Sink<TSource>, IObserver<TSource>
+        internal sealed class Context : Producer<TSource>
         {
-            private readonly ObserveOn<TSource> _parent;
+            private readonly IObservable<TSource> _source;
+            private readonly SynchronizationContext _context;
 
-            public ObserveOnSink(ObserveOn<TSource> parent, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Context(IObservable<TSource> source, SynchronizationContext context)
             {
-                _parent = parent;
+                _source = source;
+                _context = context;
             }
 
-            public IDisposable Run()
+            [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1062:Validate arguments of public methods", MessageId = "2", Justification = "Visibility restricted to friend assemblies. Those should be correct by inspection.")]
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                //
-                // The interactions with OperationStarted/OperationCompleted below allow
-                // for test frameworks to wait until a whole sequence is observed, running
-                // asserts on a per-message level. Also, for ASP.NET pages, the use of the
-                // built-in synchronization context would allow processing to finished in
-                // its entirety before moving on with the page lifecycle.
-                //
-                _parent._context.OperationStarted();
-
-                var d = _parent._source.SubscribeSafe(this);
-                var c = Disposable.Create(() =>
-                {
-                    _parent._context.OperationCompleted();
-                });
-
-                return StableCompositeDisposable.Create(d, c);
+                var sink = new _(_context, observer, cancel);
+                setSink(sink);
+                return sink.Run(_source);
             }
 
-            public void OnNext(TSource value)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                _parent._context.Post(OnNextPosted, value);
-            }
+                private readonly SynchronizationContext _context;
 
-            public void OnError(Exception error)
-            {
-                _parent._context.Post(OnErrorPosted, error);
-            }
+                public _(SynchronizationContext context, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
+                {
+                    _context = context;
+                }
 
-            public void OnCompleted()
-            {
-                _parent._context.Post(OnCompletedPosted, state: null);
-            }
+                public IDisposable Run(IObservable<TSource> source)
+                {
+                    //
+                    // The interactions with OperationStarted/OperationCompleted below allow
+                    // for test frameworks to wait until a whole sequence is observed, running
+                    // asserts on a per-message level. Also, for ASP.NET pages, the use of the
+                    // built-in synchronization context would allow processing to finished in
+                    // its entirety before moving on with the page lifecycle.
+                    //
+                    _context.OperationStarted();
+
+                    var d = source.SubscribeSafe(this);
+                    var c = Disposable.Create(_context.OperationCompleted);
+
+                    return StableCompositeDisposable.Create(d, c);
+                }
+
+                public void OnNext(TSource value)
+                {
+                    _context.Post(OnNextPosted, value);
+                }
 
-            private void OnNextPosted(object value)
-            {
-                _observer.OnNext((TSource)value);
-            }
+                public void OnError(Exception error)
+                {
+                    _context.Post(OnErrorPosted, error);
+                }
 
-            private void OnErrorPosted(object error)
-            {
-                _observer.OnError((Exception)error);
-                Dispose();
-            }
+                public void OnCompleted()
+                {
+                    _context.Post(OnCompletedPosted, state: null);
+                }
 
-            private void OnCompletedPosted(object ignored)
-            {
-                _observer.OnCompleted();
-                Dispose();
+                private void OnNextPosted(object value)
+                {
+                    _observer.OnNext((TSource)value);
+                }
+
+                private void OnErrorPosted(object error)
+                {
+                    _observer.OnError((Exception)error);
+                    Dispose();
+                }
+
+                private void OnCompletedPosted(object ignored)
+                {
+                    _observer.OnCompleted();
+                    Dispose();
+                }
             }
         }
     }

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Concurrency/Synchronization.cs

@@ -102,7 +102,7 @@ namespace System.Reactive.Concurrency
             if (scheduler == null)
                 throw new ArgumentNullException(nameof(scheduler));
 
-            return new ObserveOn<TSource>(source, scheduler);
+            return new ObserveOn<TSource>.Scheduler(source, scheduler);
         }
 
         /// <summary>
@@ -120,7 +120,7 @@ namespace System.Reactive.Concurrency
             if (context == null)
                 throw new ArgumentNullException(nameof(context));
 
-            return new ObserveOn<TSource>(source, context);
+            return new ObserveOn<TSource>.Context(source, context);
         }
 
         #endregion

+ 48 - 42
Rx.NET/Source/src/System.Reactive/Linq/Observable/ObserveOn.cs

@@ -7,33 +7,20 @@ using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class ObserveOn<TSource> : Producer<TSource>
+    internal static class ObserveOn<TSource>
     {
-        private readonly IObservable<TSource> _source;
-        private readonly IScheduler _scheduler;
-        private readonly SynchronizationContext _context;
-
-        public ObserveOn(IObservable<TSource> source, IScheduler scheduler)
+        internal sealed class Scheduler : Producer<TSource>
         {
-            _source = source;
-            _scheduler = scheduler;
-        }
-
-        public ObserveOn(IObservable<TSource> source, SynchronizationContext context)
-        {
-            _source = source;
-            _context = context;
-        }
+            private readonly IObservable<TSource> _source;
+            private readonly IScheduler _scheduler;
 
-        protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
-        {
-            if (_context != null)
+            public Scheduler(IObservable<TSource> source, IScheduler scheduler)
             {
-                var sink = new Context(_context, observer, cancel);
-                setSink(sink);
-                return _source.Subscribe(sink);
+                _source = source;
+                _scheduler = scheduler;
             }
-            else
+
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
                 var sink = new ObserveOnObserver<TSource>(_scheduler, observer, cancel);
                 setSink(sink);
@@ -41,40 +28,59 @@ namespace System.Reactive.Linq.ObservableImpl
             }
         }
 
-        private sealed class Context : Sink<TSource>, IObserver<TSource>
+        internal sealed class Context : Producer<TSource>
         {
+            private readonly IObservable<TSource> _source;
             private readonly SynchronizationContext _context;
 
-            public Context(SynchronizationContext context, IObserver<TSource> observer, IDisposable cancel)
-                : base(observer, cancel)
+            public Context(IObservable<TSource> source, SynchronizationContext context)
             {
+                _source = source;
                 _context = context;
             }
 
-            public void OnNext(TSource value)
+            protected override IDisposable Run(IObserver<TSource> observer, IDisposable cancel, Action<IDisposable> setSink)
             {
-                _context.PostWithStartComplete(() =>
-                {
-                    base._observer.OnNext(value);
-                });
+                var sink = new _(_context, observer, cancel);
+                setSink(sink);
+                return _source.Subscribe(sink);
             }
 
-            public void OnError(Exception error)
+            private sealed class _ : Sink<TSource>, IObserver<TSource>
             {
-                _context.PostWithStartComplete(() =>
+                private readonly SynchronizationContext _context;
+
+                public _(SynchronizationContext context, IObserver<TSource> observer, IDisposable cancel)
+                    : base(observer, cancel)
                 {
-                    base._observer.OnError(error);
-                    base.Dispose();
-                });
-            }
+                    _context = context;
+                }
 
-            public void OnCompleted()
-            {
-                _context.PostWithStartComplete(() =>
+                public void OnNext(TSource value)
+                {
+                    _context.PostWithStartComplete(() =>
+                    {
+                        base._observer.OnNext(value);
+                    });
+                }
+
+                public void OnError(Exception error)
+                {
+                    _context.PostWithStartComplete(() =>
+                    {
+                        base._observer.OnError(error);
+                        base.Dispose();
+                    });
+                }
+
+                public void OnCompleted()
                 {
-                    base._observer.OnCompleted();
-                    base.Dispose();
-                });
+                    _context.PostWithStartComplete(() =>
+                    {
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    });
+                }
             }
         }
     }