소스 검색

Merge branch 'master' into RepeatImprovements

David Karnok 7 년 전
부모
커밋
952c81a22d

+ 42 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/BufferCountBenchmark.cs

@@ -0,0 +1,42 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class BufferCountBenchmark
+    {
+        IList<int> _store;
+
+        [Benchmark]
+        public void Exact()
+        {
+            Observable.Range(1, 1000)
+                .Buffer(1)
+                .Subscribe(v => Volatile.Write(ref _store, v));
+        }
+
+        [Benchmark]
+        public void Skip()
+        {
+            Observable.Range(1, 1000)
+                .Buffer(1, 2)
+                .Subscribe(v => Volatile.Write(ref _store, v));
+        }
+
+        [Benchmark]
+        public void Overlap()
+        {
+            Observable.Range(1, 1000)
+                .Buffer(2, 1)
+                .Subscribe(v => Volatile.Write(ref _store, v));
+        }
+    }
+}

+ 1 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -15,6 +15,7 @@ namespace Benchmarks.System.Reactive
                 typeof(ZipBenchmark),
                 typeof(CombineLatestBenchmark),
                 typeof(SwitchBenchmark),
+                typeof(BufferCountBenchmark),
                 typeof(RepeatBenchmark)
             });
 

+ 46 - 0
Rx.NET/Source/src/System.Reactive/Concurrency/Scheduler.Simple.cs

@@ -76,6 +76,26 @@ namespace System.Reactive.Concurrency
             return scheduler.Schedule(action, dueTime, (s, a) => Invoke(s, a));
         }
 
+        /// <summary>
+        /// Schedules an action to be executed after the specified relative due time.
+        /// </summary>
+        /// <param name="scheduler">Scheduler to execute the action on.</param>
+        /// <param name="action">Action to execute.</param>
+        /// <param name="state">A state object to be passed to <paramref name="action"/>.</param>
+        /// <param name="dueTime">Relative time after which to execute the action.</param>
+        /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
+        internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, TimeSpan dueTime, Action<TState> action)
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            // See note above.
+            return scheduler.Schedule((state, action), dueTime, (s, tuple) => Invoke(s, tuple));
+        }
+
         /// <summary>
         /// Schedules an action to be executed at the specified absolute due time.
         /// </summary>
@@ -95,6 +115,26 @@ namespace System.Reactive.Concurrency
             return scheduler.Schedule(action, dueTime, (s, a) => Invoke(s, a));
         }
 
+        /// <summary>
+        /// Schedules an action to be executed after the specified relative due time.
+        /// </summary>
+        /// <param name="scheduler">Scheduler to execute the action on.</param>
+        /// <param name="action">Action to execute.</param>
+        /// <param name="state">A state object to be passed to <paramref name="action"/>.</param>
+        /// <param name="dueTime">Relative time after which to execute the action.</param>
+        /// <returns>The disposable object used to cancel the scheduled action (best effort).</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="scheduler"/> or <paramref name="action"/> is <c>null</c>.</exception>
+        internal static IDisposable ScheduleAction<TState>(this IScheduler scheduler, TState state, DateTimeOffset dueTime, Action<TState> action)
+        {
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            // See note above.
+            return scheduler.Schedule((state, action), dueTime, (s, tuple) => Invoke(s, tuple));
+        }
+
         /// <summary>
         /// Schedules an action to be executed.
         /// </summary>
@@ -117,5 +157,11 @@ namespace System.Reactive.Concurrency
             action();
             return Disposable.Empty;
         }
+
+        private static IDisposable Invoke<TState>(IScheduler scheduler, (TState state, Action<TState> action) tuple)
+        {
+            tuple.action(tuple.state);
+            return Disposable.Empty;
+        }
     }
 }

+ 171 - 20
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -10,45 +10,196 @@ namespace System.Reactive.Linq.ObservableImpl
 {
     internal static class Buffer<TSource>
     {
-        internal sealed class Count : Producer<IList<TSource>, Count._>
+        internal sealed class CountExact : Producer<IList<TSource>, CountExact.ExactSink>
+        {
+            readonly IObservable<TSource> _source;
+
+            readonly int _count;
+
+            public CountExact(IObservable<TSource> source, int count)
+            {
+                _source = source;
+                _count = count;
+            }
+
+            protected override ExactSink CreateSink(IObserver<IList<TSource>> observer) => new ExactSink(observer, _count);
+
+            protected override void Run(ExactSink sink) => sink.Run(_source);
+
+            internal sealed class ExactSink : Sink<TSource, IList<TSource>>
+            {
+                readonly int _count;
+
+                int _index;
+
+                IList<TSource> _buffer;
+
+                internal ExactSink(IObserver<IList<TSource>> observer, int count) : base(observer)
+                {
+                    _count = count;
+                }
+
+                public override void OnNext(TSource value)
+                {
+                    var buffer = _buffer;
+                    if (buffer == null)
+                    {
+                        buffer = new List<TSource>();
+                        _buffer = buffer;
+                    }
+
+                    buffer.Add(value);
+
+                    var idx = _index + 1;
+                    if (idx == _count)
+                    {
+                        _buffer = null;
+                        _index = 0;
+                        ForwardOnNext(buffer);
+                    }
+                    else
+                    {
+                        _index = idx;
+                    }
+                }
+
+                public override void OnError(Exception error)
+                {
+                    _buffer = null;
+                    ForwardOnError(error);
+                }
+
+                public override void OnCompleted()
+                {
+                    var buffer = _buffer;
+                    _buffer = null;
+
+                    if (buffer != null)
+                    {
+                        ForwardOnNext(buffer);
+                    }
+                    ForwardOnCompleted();
+                }   
+            }
+        }
+
+        internal sealed class CountSkip : Producer<IList<TSource>, CountSkip.SkipSink>
+        {
+            readonly IObservable<TSource> _source;
+
+            readonly int _count;
+
+            readonly int _skip;
+
+            public CountSkip(IObservable<TSource> source, int count, int skip)
+            {
+                _source = source;
+                _count = count;
+                _skip = skip;
+            }
+
+            protected override SkipSink CreateSink(IObserver<IList<TSource>> observer) => new SkipSink(observer, _count, _skip);
+
+            protected override void Run(SkipSink sink) => sink.Run(_source);
+
+            internal sealed class SkipSink : Sink<TSource, IList<TSource>>
+            {
+                readonly int _count;
+
+                readonly int _skip;
+
+                int _index;
+
+                IList<TSource> _buffer;
+
+                internal SkipSink(IObserver<IList<TSource>> observer, int count, int skip) : base(observer)
+                {
+                    _count = count;
+                    _skip = skip;
+                }
+
+                public override void OnNext(TSource value)
+                {
+                    var idx = _index;
+                    var buffer = _buffer;
+                    if (idx == 0)
+                    {
+                        buffer = new List<TSource>();
+                        _buffer = buffer;
+                    }
+
+                    buffer?.Add(value);
+
+                    if (++idx == _count)
+                    {
+                        _buffer = null;
+                        ForwardOnNext(buffer);
+                    }
+
+                    if (idx == _skip)
+                    {
+                        _index = 0;
+                    }
+                    else
+                    {
+                        _index = idx;
+                    }
+                }
+
+                public override void OnError(Exception error)
+                {
+                    _buffer = null;
+                    ForwardOnError(error);
+                }
+
+                public override void OnCompleted()
+                {
+                    var buffer = _buffer;
+                    _buffer = null;
+
+                    if (buffer != null)
+                    {
+                        ForwardOnNext(buffer);
+                    }
+                    ForwardOnCompleted();
+                }
+            }
+        }
+
+        internal sealed class CountOverlap : Producer<IList<TSource>, CountOverlap.OverlapSink>
         {
             private readonly IObservable<TSource> _source;
             private readonly int _count;
             private readonly int _skip;
 
-            public Count(IObservable<TSource> source, int count, int skip)
+            public CountOverlap(IObservable<TSource> source, int count, int skip)
             {
                 _source = source;
                 _count = count;
                 _skip = skip;
             }
 
-            protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(this, observer);
+            protected override OverlapSink CreateSink(IObserver<IList<TSource>> observer) => new OverlapSink(observer, _count, _skip);
 
-            protected override void Run(_ sink) => sink.Run(_source);
+            protected override void Run(OverlapSink sink) => sink.Run(_source);
 
-            internal sealed class _ : Sink<TSource, IList<TSource>> 
+            internal sealed class OverlapSink : Sink<TSource, IList<TSource>> 
             {
-                private readonly Queue<IList<TSource>> _queue = new Queue<IList<TSource>>();
+                private readonly Queue<IList<TSource>> _queue;
 
                 private readonly int _count;
                 private readonly int _skip;
 
-                public _(Count parent, IObserver<IList<TSource>> observer)
-                    : base(observer)
-                {
-                    _count = parent._count;
-                    _skip = parent._skip;
-                }
-
-                private int _n;
+                int _index;
+                int _n;
 
-                public override void Run(IObservable<TSource> source)
+                public OverlapSink(IObserver<IList<TSource>> observer, int count, int skip)
+                    : base(observer)
                 {
-                    _n = 0;
-
+                    _queue = new Queue<IList<TSource>>();
+                    _count = count;
+                    _skip = skip;
                     CreateWindow();
-                    base.Run(source);
                 }
 
                 private void CreateWindow()
@@ -77,8 +228,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnError(Exception error)
                 {
-                    while (_queue.Count > 0)
-                        _queue.Dequeue().Clear();
+                    // just drop the ILists on the GC floor, no reason to clear them
+                    _queue.Clear();
 
                     ForwardOnError(error);
                 }

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/LongCount.cs

@@ -26,7 +26,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 public _(IObserver<long> observer)
                     : base(observer)
                 {
-                    _count = 0L;
                 }
 
                 public override void OnNext(TSource value)
@@ -76,7 +75,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     : base(observer)
                 {
                     _predicate = predicate;
-                    _count = 0L;
                 }
 
                 public override void OnNext(TSource value)

+ 14 - 7
Rx.NET/Source/src/System.Reactive/Linq/Observable/MinBy.cs

@@ -25,7 +25,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : Sink<TSource, IList<TSource>> 
         {
-            private readonly MinBy<TSource, TKey> _parent;
+            readonly Func<TSource, TKey> _keySelector;
+            readonly IComparer<TKey> _comparer;
             private bool _hasValue;
             private TKey _lastKey;
             private List<TSource> _list;
@@ -33,10 +34,9 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(MinBy<TSource, TKey> parent, IObserver<IList<TSource>> observer)
                 : base(observer)
             {
-                _parent = parent;
+                _keySelector = parent._keySelector;
+                _comparer = parent._comparer;
 
-                _hasValue = false;
-                _lastKey = default(TKey);
                 _list = new List<TSource>();
             }
 
@@ -45,10 +45,12 @@ namespace System.Reactive.Linq.ObservableImpl
                 var key = default(TKey);
                 try
                 {
-                    key = _parent._keySelector(value);
+                    key = _keySelector(value);
                 }
                 catch (Exception ex)
                 {
+                    _list = null;
+                    _lastKey = default;
                     ForwardOnError(ex);
                     return;
                 }
@@ -64,10 +66,12 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     try
                     {
-                        comparison = _parent._comparer.Compare(key, _lastKey);
+                        comparison = _comparer.Compare(key, _lastKey);
                     }
                     catch (Exception ex)
                     {
+                        _list = null;
+                        _lastKey = default;
                         ForwardOnError(ex);
                         return;
                     }
@@ -87,7 +91,10 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public override void OnCompleted()
             {
-                ForwardOnNext(_list);
+                var list = _list;
+                _list = null;
+                _lastKey = default;
+                ForwardOnNext(list);
                 ForwardOnCompleted();
             }
         }

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/OfType.cs

@@ -26,9 +26,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public override void OnNext(TSource value)
             {
-                if (value is TResult)
+                if (value is TResult v)
                 {
-                    ForwardOnNext((TResult)(object)value);
+                    ForwardOnNext(v);
                 }
             }
         }

+ 0 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/RefCount.cs

@@ -23,8 +23,6 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 _source = source;
                 _gate = new object();
-                _count = 0;
-                _connectableSubscription = default(IDisposable);
             }
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(observer, this);
@@ -100,8 +98,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 _gate = new object();
                 _disconnectTime = disconnectTime;
                 _scheduler = scheduler;
-                _count = 0;
-                _connectableSubscription = default(IDisposable);
             }
 
             protected override _ CreateSink(IObserver<TSource> observer) => new _(observer);

+ 11 - 7
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs

@@ -50,17 +50,21 @@ namespace System.Reactive.Linq
 
         public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count)
         {
-            return Buffer_<TSource>(source, count, count);
+            return new Buffer<TSource>.CountExact(source, count);
         }
 
         public virtual IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip)
         {
-            return Buffer_<TSource>(source, count, skip);
-        }
-
-        private static IObservable<IList<TSource>> Buffer_<TSource>(IObservable<TSource> source, int count, int skip)
-        {
-            return new Buffer<TSource>.Count(source, count, skip);
+            if (count > skip)
+            {
+                return new Buffer<TSource>.CountOverlap(source, count, skip);
+            }
+            else if (count < skip)
+            {
+                return new Buffer<TSource>.CountSkip(source, count, skip);
+            }
+            // count == skip
+            return new Buffer<TSource>.CountExact(source, count);
         }
 
         #endregion