浏览代码

4.x: Improve Return/Throw/Append/Prepend with ImmediateScheduler

akarnokd 7 年之前
父节点
当前提交
790f0748e2

+ 2 - 1
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -23,7 +23,8 @@ namespace Benchmarks.System.Reactive
                 typeof(ToObservableBenchmark),
                 typeof(ToObservableBenchmark),
                 typeof(RepeatBenchmark),
                 typeof(RepeatBenchmark),
                 typeof(ComparisonBenchmark),
                 typeof(ComparisonBenchmark),
-                typeof(ComparisonAsyncBenchmark)
+                typeof(ComparisonAsyncBenchmark),
+                typeof(ScalarScheduleBenchmark)
 #if (CURRENT)
 #if (CURRENT)
                 ,typeof(AppendPrependBenchmark)
                 ,typeof(AppendPrependBenchmark)
 #endif
 #endif

+ 179 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/ScalarScheduleBenchmark.cs

@@ -0,0 +1,179 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class ScalarScheduleBenchmark
+    {
+        private int _store;
+        private Exception _exceptionStore;
+
+        private IScheduler _eventLoop;
+
+        private Exception _exception;
+
+        [GlobalSetup]
+        public void Setup()
+        {
+            _eventLoop = new EventLoopScheduler();
+            _exception = new Exception();
+        }
+
+        private void BlockingConsume(IObservable<int> source)
+        {
+            var cde = new CountdownEvent(1);
+
+            source.Subscribe(v => Volatile.Write(ref _store, v),
+                e => 
+                {
+                    Volatile.Write(ref _exceptionStore, e);
+                    cde.Signal();
+                }, 
+                () => cde.Signal()
+            );
+
+            // spin-wait will result in faster completion detection
+            // because it takes 5 microseconds to resume a blocked thread
+            // for me on Windows
+            while (cde.CurrentCount != 0) ;
+        }
+
+        private void ConsumeSync(IObservable<int> source)
+        {
+            source.Subscribe(v => Volatile.Write(ref _store, v), e => Volatile.Write(ref _exceptionStore, e));
+        }
+
+        [Benchmark]
+        public void Return_Immediate()
+        {
+            ConsumeSync(Observable.Return(1, ImmediateScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Return_CurrentThread()
+        {
+            ConsumeSync(Observable.Return(1, CurrentThreadScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Return_EventLoop()
+        {
+            BlockingConsume(Observable.Return(1, _eventLoop));
+        }
+
+        [Benchmark]
+        public void Return_TaskPool()
+        {
+            BlockingConsume(Observable.Return(1, TaskPoolScheduler.Default));
+        }
+
+        [Benchmark]
+        public void Return_ThreadPool()
+        {
+            BlockingConsume(Observable.Return(1, ThreadPoolScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Throw_Immediate()
+        {
+            ConsumeSync(Observable.Throw<int>(_exception, ImmediateScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Throw_CurrentThread()
+        {
+            ConsumeSync(Observable.Throw<int>(_exception, CurrentThreadScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Throw_EventLoop()
+        {
+            BlockingConsume(Observable.Throw<int>(_exception, _eventLoop));
+        }
+
+        [Benchmark]
+        public void Throw_TaskPool()
+        {
+            BlockingConsume(Observable.Throw<int>(_exception, TaskPoolScheduler.Default));
+        }
+
+        [Benchmark]
+        public void Throw_ThreadPool()
+        {
+            BlockingConsume(Observable.Throw<int>(_exception, ThreadPoolScheduler.Instance));
+        }
+
+#if CURRENT
+
+        [Benchmark]
+        public void Prepend_Immediate()
+        {
+            ConsumeSync(Observable.Return(1, ImmediateScheduler.Instance).Prepend(0, ImmediateScheduler.Instance));
+        }
+
+
+        [Benchmark]
+        public void Prepend_CurrentThread()
+        {
+            ConsumeSync(Observable.Return(1, CurrentThreadScheduler.Instance).Prepend(0, CurrentThreadScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Prepend_EventLoop()
+        {
+            BlockingConsume(Observable.Return(1, _eventLoop).Prepend(0, _eventLoop));
+        }
+
+        [Benchmark]
+        public void Prepend_TaskPool()
+        {
+            BlockingConsume(Observable.Return(1, TaskPoolScheduler.Default).Prepend(0, TaskPoolScheduler.Default));
+        }
+
+        [Benchmark]
+        public void Prepend_ThreadPool()
+        {
+            BlockingConsume(Observable.Return(1, ThreadPoolScheduler.Instance).Prepend(0, ThreadPoolScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Append_Immediate()
+        {
+            ConsumeSync(Observable.Return(1, ImmediateScheduler.Instance).Append(0, ImmediateScheduler.Instance));
+        }
+
+
+        [Benchmark]
+        public void Append_CurrentThread()
+        {
+            ConsumeSync(Observable.Return(1, CurrentThreadScheduler.Instance).Append(0, CurrentThreadScheduler.Instance));
+        }
+
+        [Benchmark]
+        public void Append_EventLoop()
+        {
+            BlockingConsume(Observable.Return(1, _eventLoop).Append(0, _eventLoop));
+        }
+
+        [Benchmark]
+        public void Append_TaskPool()
+        {
+            BlockingConsume(Observable.Return(1, TaskPoolScheduler.Default).Append(0, TaskPoolScheduler.Default));
+        }
+
+        [Benchmark]
+        public void Append_ThreadPool()
+        {
+            BlockingConsume(Observable.Return(1, ThreadPoolScheduler.Instance).Append(0, ThreadPoolScheduler.Instance));
+        }
+#endif
+    }
+}

+ 81 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/AppendPrepend.cs

@@ -360,5 +360,86 @@ namespace System.Reactive.Linq.ObservableImpl
                 return array;
                 return array;
             }
             }
         }
         }
+
+        internal sealed class AppendPrependSingleImmediate<TSource> : Producer<TSource, AppendPrependSingleImmediate<TSource>._>, IAppendPrepend<TSource>
+        {
+            private readonly IObservable<TSource> _source;
+            private readonly TSource _value;
+            private readonly bool _append;
+
+            public IScheduler Scheduler { get { return ImmediateScheduler.Instance; } }
+
+            public AppendPrependSingleImmediate(IObservable<TSource> source, TSource value, bool append)
+            {
+                _source = source;
+                _value = value;
+                _append = append;
+            }
+
+            public IAppendPrepend<TSource> Append(TSource value)
+            {
+                var prev = new Node<TSource>(_value);
+
+                if (_append)
+                {
+                    return new AppendPrependMultiple<TSource>(_source,
+                        null, new Node<TSource>(prev, value), Scheduler);
+                }
+
+                return new AppendPrependMultiple<TSource>(_source,
+                    prev, new Node<TSource>(value), Scheduler);
+            }
+
+            public IAppendPrepend<TSource> Prepend(TSource value)
+            {
+                var prev = new Node<TSource>(_value);
+
+                if (_append)
+                {
+                    return new AppendPrependMultiple<TSource>(_source,
+                        new Node<TSource>(value), prev, Scheduler);
+                }
+
+                return new AppendPrependMultiple<TSource>(_source,
+                    new Node<TSource>(prev, value), null, Scheduler);
+            }
+
+            protected override _ CreateSink(IObserver<TSource> observer) => new _(this, observer);
+
+            protected override void Run(_ sink) => sink.Run();
+
+            internal sealed class _ : IdentitySink<TSource>
+            {
+                private readonly IObservable<TSource> _source;
+                private readonly TSource _value;
+                private readonly bool _append;
+
+                public _(AppendPrependSingleImmediate<TSource> parent, IObserver<TSource> observer)
+                    : base(observer)
+                {
+                    _source = parent._source;
+                    _value = parent._value;
+                    _append = parent._append;
+                }
+
+                public void Run()
+                {
+                    if (!_append)
+                    {
+                        ForwardOnNext(_value);
+                    }
+                    Run(_source);
+                }
+
+                public override void OnCompleted()
+                {
+                    if (_append)
+                    {
+                        ForwardOnNext(_value);
+                    }
+                    ForwardOnCompleted();
+                }
+            }
+        }
     }
     }
 }
 }

+ 21 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Return.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 // See the LICENSE file in the project root for more information. 
 
 
 using System.Reactive.Concurrency;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 
 
 namespace System.Reactive.Linq.ObservableImpl
 namespace System.Reactive.Linq.ObservableImpl
 {
 {
@@ -43,4 +44,24 @@ namespace System.Reactive.Linq.ObservableImpl
             }
             }
         }
         }
     }
     }
+
+    // There is no need for a full Producer/IdentitySink as there is no
+    // way to stop a first task running on the immediate scheduler
+    // as it is always synchronous.
+    internal sealed class ReturnImmediate<TSource> : BasicProducer<TSource>
+    {
+        private readonly TSource _value;
+
+        public ReturnImmediate(TSource value)
+        {
+            _value = value;
+        }
+
+        protected override IDisposable Run(IObserver<TSource> observer)
+        {
+            observer.OnNext(_value);
+            observer.OnCompleted();
+            return Disposable.Empty;
+        }
+    }
 }
 }

+ 20 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/Throw.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 // See the LICENSE file in the project root for more information. 
 
 
 using System.Reactive.Concurrency;
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 
 
 namespace System.Reactive.Linq.ObservableImpl
 namespace System.Reactive.Linq.ObservableImpl
 {
 {
@@ -37,4 +38,23 @@ namespace System.Reactive.Linq.ObservableImpl
             }
             }
         }
         }
     }
     }
+
+    // There is no need for a full Producer/IdentitySink as there is no
+    // way to stop a first task running on the immediate scheduler
+    // as it is always synchronous.
+    internal sealed class ThrowImmediate<TSource> : BasicProducer<TSource>
+    {
+        private readonly Exception _exception;
+
+        public ThrowImmediate(Exception exception)
+        {
+            _exception = exception;
+        }
+
+        protected override IDisposable Run(IObserver<TSource> observer)
+        {
+            observer.OnError(_exception);
+            return Disposable.Empty;
+        }
+    }
 }
 }

+ 16 - 2
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -425,11 +425,18 @@ namespace System.Reactive.Linq
 
 
         public virtual IObservable<TResult> Return<TResult>(TResult value)
         public virtual IObservable<TResult> Return<TResult>(TResult value)
         {
         {
-            return new Return<TResult>(value, SchedulerDefaults.ConstantTimeOperations);
+            // ConstantTimeOperations is a mutable field so we'd have to
+            // check if it points to the immediate scheduler instance
+            // which is done in the other Return overload anyway
+            return Return(value, SchedulerDefaults.ConstantTimeOperations);
         }
         }
 
 
         public virtual IObservable<TResult> Return<TResult>(TResult value, IScheduler scheduler)
         public virtual IObservable<TResult> Return<TResult>(TResult value, IScheduler scheduler)
         {
         {
+            if (scheduler == ImmediateScheduler.Instance)
+            {
+                return new ReturnImmediate<TResult>(value);
+            }
             return new Return<TResult>(value, scheduler);
             return new Return<TResult>(value, scheduler);
         }
         }
 
 
@@ -439,11 +446,18 @@ namespace System.Reactive.Linq
 
 
         public virtual IObservable<TResult> Throw<TResult>(Exception exception)
         public virtual IObservable<TResult> Throw<TResult>(Exception exception)
         {
         {
-            return new Throw<TResult>(exception, SchedulerDefaults.ConstantTimeOperations);
+            // ConstantTimeOperations is a mutable field so we'd have to
+            // check if it points to the immediate scheduler instance
+            // which is done in the other Return overload anyway
+            return Throw<TResult>(exception, SchedulerDefaults.ConstantTimeOperations);
         }
         }
 
 
         public virtual IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler)
         public virtual IObservable<TResult> Throw<TResult>(Exception exception, IScheduler scheduler)
         {
         {
+            if (scheduler == ImmediateScheduler.Instance)
+            {
+                return new ThrowImmediate<TResult>(exception);
+            }
             return new Throw<TResult>(exception, scheduler);
             return new Throw<TResult>(exception, scheduler);
         }
         }
 
 

+ 9 - 1
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs

@@ -30,7 +30,10 @@ namespace System.Reactive.Linq
             {
             {
                 return ap.Append(value);
                 return ap.Append(value);
             }
             }
-
+            if (scheduler == ImmediateScheduler.Instance)
+            {
+                return new AppendPrepend.AppendPrependSingleImmediate<TSource>(source, value, true);
+            }
             return new AppendPrepend.AppendPrependSingle<TSource>(source, value, scheduler, append: true);
             return new AppendPrepend.AppendPrependSingle<TSource>(source, value, scheduler, append: true);
         }
         }
 
 
@@ -208,6 +211,11 @@ namespace System.Reactive.Linq
                 return ap.Prepend(value);
                 return ap.Prepend(value);
             }
             }
 
 
+            if (scheduler == ImmediateScheduler.Instance)
+            {
+                return new AppendPrepend.AppendPrependSingleImmediate<TSource>(source, value, false);
+            }
+
             return new AppendPrepend.AppendPrependSingle<TSource>(source, value, scheduler, append: false);
             return new AppendPrepend.AppendPrependSingle<TSource>(source, value, scheduler, append: false);
         }
         }