| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information.using System;using System.Reactive.Concurrency;using System.Reactive.Linq;using System.Threading;using BenchmarkDotNet.Attributes;namespace Benchmarks.System.Reactive{    [MemoryDiagnoser]    public class ScalarScheduleBenchmark    {        private int _store;        private Exception _exceptionStore;        private IScheduler _eventLoop;        private Exception _exception;        [GlobalSetup]        public void Setup()        {            _eventLoop = new EventLoopScheduler();            _exception = new Exception();        }        private void BlockingConsume(IObservable<int> source)        {            var cde = new CountdownEvent(1);            source.Subscribe(v => Volatile.Write(ref _store, v),                e =>                 {                    Volatile.Write(ref _exceptionStore, e);                    cde.Signal();                },                 () => cde.Signal()            );            // spin-wait will result in faster completion detection            // because it takes 5 microseconds to resume a blocked thread            // for me on Windows            while (cde.CurrentCount != 0) ;        }        private void ConsumeSync(IObservable<int> source)        {            source.Subscribe(v => Volatile.Write(ref _store, v), e => Volatile.Write(ref _exceptionStore, e));        }        [Benchmark]        public void Return_Immediate()        {            ConsumeSync(Observable.Return(1, ImmediateScheduler.Instance));        }        [Benchmark]        public void Return_CurrentThread()        {            ConsumeSync(Observable.Return(1, CurrentThreadScheduler.Instance));        }        [Benchmark]        public void Return_EventLoop()        {            BlockingConsume(Observable.Return(1, _eventLoop));        }        [Benchmark]        public void Return_TaskPool()        {            BlockingConsume(Observable.Return(1, TaskPoolScheduler.Default));        }        [Benchmark]        public void Return_ThreadPool()        {            BlockingConsume(Observable.Return(1, ThreadPoolScheduler.Instance));        }        [Benchmark]        public void Throw_Immediate()        {            ConsumeSync(Observable.Throw<int>(_exception, ImmediateScheduler.Instance));        }        [Benchmark]        public void Throw_CurrentThread()        {            ConsumeSync(Observable.Throw<int>(_exception, CurrentThreadScheduler.Instance));        }        [Benchmark]        public void Throw_EventLoop()        {            BlockingConsume(Observable.Throw<int>(_exception, _eventLoop));        }        [Benchmark]        public void Throw_TaskPool()        {            BlockingConsume(Observable.Throw<int>(_exception, TaskPoolScheduler.Default));        }        [Benchmark]        public void Throw_ThreadPool()        {            BlockingConsume(Observable.Throw<int>(_exception, ThreadPoolScheduler.Instance));        }#if CURRENT        [Benchmark]        public void Prepend_Immediate()        {            ConsumeSync(Observable.Return(1, ImmediateScheduler.Instance).Prepend(0, ImmediateScheduler.Instance));        }        [Benchmark]        public void Prepend_CurrentThread()        {            ConsumeSync(Observable.Return(1, CurrentThreadScheduler.Instance).Prepend(0, CurrentThreadScheduler.Instance));        }        [Benchmark]        public void Prepend_EventLoop()        {            BlockingConsume(Observable.Return(1, _eventLoop).Prepend(0, _eventLoop));        }        [Benchmark]        public void Prepend_TaskPool()        {            BlockingConsume(Observable.Return(1, TaskPoolScheduler.Default).Prepend(0, TaskPoolScheduler.Default));        }        [Benchmark]        public void Prepend_ThreadPool()        {            BlockingConsume(Observable.Return(1, ThreadPoolScheduler.Instance).Prepend(0, ThreadPoolScheduler.Instance));        }        [Benchmark]        public void Append_Immediate()        {            ConsumeSync(Observable.Return(1, ImmediateScheduler.Instance).Append(0, ImmediateScheduler.Instance));        }        [Benchmark]        public void Append_CurrentThread()        {            ConsumeSync(Observable.Return(1, CurrentThreadScheduler.Instance).Append(0, CurrentThreadScheduler.Instance));        }        [Benchmark]        public void Append_EventLoop()        {            BlockingConsume(Observable.Return(1, _eventLoop).Append(0, _eventLoop));        }        [Benchmark]        public void Append_TaskPool()        {            BlockingConsume(Observable.Return(1, TaskPoolScheduler.Default).Append(0, TaskPoolScheduler.Default));        }        [Benchmark]        public void Append_ThreadPool()        {            BlockingConsume(Observable.Return(1, ThreadPoolScheduler.Instance).Append(0, ThreadPoolScheduler.Instance));        }#endif    }}
 |