Преглед изворни кода

Merge pull request #363 from Reactive-Extensions/WeGotConcurrentApisNow

Removing checks for concurrent APIs
Bart J.F. De Smet пре 8 година
родитељ
комит
0d7d3d7953

+ 0 - 16
Rx.NET/Source/src/System.Reactive/Concurrency/EventLoopScheduler.cs

@@ -45,11 +45,7 @@ namespace System.Reactive.Concurrency
         /// Semaphore to count requests to re-evaluate the queue, from either Schedule requests or when a timer
         /// expires and moves on to the next item in the queue.
         /// </summary>
-#if !NO_CDS
         private readonly SemaphoreSlim _evt;
-#else
-        private readonly Semaphore _evt;
-#endif
 
         /// <summary>
         /// Queue holding work items. Protected by the gate.
@@ -109,11 +105,7 @@ namespace System.Reactive.Concurrency
 
             _gate = new object();
 
-#if !NO_CDS
             _evt = new SemaphoreSlim(0);
-#else
-            _evt = new Semaphore(0, int.MaxValue);
-#endif
             _queue = new SchedulerQueue<TimeSpan>();
             _readyList = new Queue<ScheduledItem<TimeSpan>>();
 
@@ -281,11 +273,7 @@ namespace System.Reactive.Concurrency
         {
             while (true)
             {
-#if !NO_CDS
                 _evt.Wait();
-#else
-                _evt.WaitOne();
-#endif
 
                 var ready = default(ScheduledItem<TimeSpan>[]);
 
@@ -295,11 +283,7 @@ namespace System.Reactive.Concurrency
                     // Bug fix that ensures the number of calls to Release never greatly exceeds the number of calls to Wait.
                     // See work item #37: https://rx.codeplex.com/workitem/37
                     //
-#if !NO_CDS
                     while (_evt.CurrentCount > 0) _evt.Wait();
-#else
-                    while (_evt.WaitOne(TimeSpan.Zero)) { }
-#endif
 
                     //
                     // The event could have been set by a call to Dispose. This takes priority over anything else. We quit the

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Internal/PushPullAdapter.cs

@@ -2,7 +2,7 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
-#if NO_CDS || NO_PERF
+#if NO_PERF
 using System.Collections.Generic;
 
 namespace System.Reactive

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Internal/ScheduledObserver.cs

@@ -9,7 +9,7 @@ using System.Threading;
 
 namespace System.Reactive
 {
-#if !NO_PERF && !NO_CDS
+#if !NO_PERF
     using System.Collections.Concurrent;
     using System.Diagnostics;
 

+ 0 - 33
Rx.NET/Source/src/System.Reactive/Linq/Observable/Delay.cs

@@ -336,14 +336,8 @@ namespace System.Reactive.Linq.ObservableImpl
             private IStopwatch _watch;
 
             private object _gate;
-#if !NO_CDS
             private SemaphoreSlim _evt;
             private CancellationTokenSource _stop;
-#else
-            private Semaphore _evt;
-            private bool _stopped;
-            private ManualResetEvent _stop;
-#endif
             private Queue<System.Reactive.TimeInterval<TSource>> _queue;
             private bool _hasCompleted;
             private TimeSpan _completeAt;
@@ -355,11 +349,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _cancelable = new SerialDisposable();
 
                 _gate = new object();
-#if !NO_CDS
                 _evt = new SemaphoreSlim(0);
-#else
-                _evt = new Semaphore(0, int.MaxValue);
-#endif
                 _queue = new Queue<System.Reactive.TimeInterval<TSource>>();
                 _hasCompleted = false;
                 _completeAt = default(TimeSpan);
@@ -408,18 +398,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
             private void ScheduleDrain()
             {
-#if !NO_CDS
                 _stop = new CancellationTokenSource();
                 _cancelable.Disposable = Disposable.Create(() => _stop.Cancel());
-#else
-                _stop = new ManualResetEvent(false);
-                _cancelable.Disposable = Disposable.Create(() =>
-                {
-                    _stopped = true;
-                    _stop.Set();
-                    _evt.Release();
-                });
-#endif
 
                 _parent._scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
             }
@@ -470,7 +450,6 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 while (true)
                 {
-#if !NO_CDS
                     try
                     {
                         _evt.Wait(_stop.Token);
@@ -479,11 +458,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         return;
                     }
-#else
-                    _evt.WaitOne();
-                    if (_stopped)
-                        return;
-#endif
 
                     var hasFailed = false;
                     var error = default(Exception);
@@ -535,7 +509,6 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (shouldWait)
                     {
-#if !NO_CDS
                         var timer = new ManualResetEventSlim();
                         _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
 
@@ -547,12 +520,6 @@ namespace System.Reactive.Linq.ObservableImpl
                         {
                             return;
                         }
-#else
-                        var timer = new ManualResetEvent(false);
-                        _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
-                        if (WaitHandle.WaitAny(new[] { timer, _stop }) == 1)
-                            return;
-#endif
                     }
 
                     if (hasValue)

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/GetEnumerator.cs

@@ -2,7 +2,7 @@
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
-#if !NO_PERF && !NO_CDS
+#if !NO_PERF
 using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;

+ 0 - 56
Rx.NET/Source/src/System.Reactive/Linq/Observable/GroupByUntil.cs

@@ -277,7 +277,6 @@ namespace System.Reactive.Linq.ObservableImpl
         }
     }
 
-#if !NO_CDS
     class Map<TKey, TValue>
     {
         // Taken from ConcurrentDictionary in the BCL.
@@ -351,60 +350,5 @@ namespace System.Reactive.Linq.ObservableImpl
             return _map.TryRemove(key, out value);
         }
     }
-#else
-    class Map<TKey, TValue>
-    {
-        private readonly Dictionary<TKey, TValue> _map;
-
-        public Map(int? capacity, IEqualityComparer<TKey> comparer)
-        {
-            if (capacity.HasValue)
-            {
-                _map = new Dictionary<TKey, TValue>(capacity.Value, comparer);
-            }
-            else
-            {
-                _map = new Dictionary<TKey, TValue>(comparer);
-            }
-        }
-
-        public TValue GetOrAdd(TKey key, Func<TValue> valueFactory, out bool added)
-        {
-            lock (_map)
-            {
-                added = false;
-
-                var value = default(TValue);
-                if (!_map.TryGetValue(key, out value))
-                {
-                    value = valueFactory();
-                    _map.Add(key, value);
-                    added = true;
-                }
-
-                return value;
-            }
-        }
-
-        public IEnumerable<TValue> Values
-        {
-            get
-            {
-                lock (_map)
-                {
-                    return _map.Values.ToArray();
-                }
-            }
-        }
-
-        public bool Remove(TKey key)
-        {
-            lock (_map)
-            {
-                return _map.Remove(key);
-            }
-        }
-    }
-#endif
 }
 #endif

+ 0 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/Latest.cs

@@ -25,23 +25,13 @@ namespace System.Reactive.Linq.ObservableImpl
         class _ : PushToPullSink<TSource, TSource>
         {
             private readonly object _gate;
-
-#if !NO_CDS
             private readonly SemaphoreSlim _semaphore;
-#else
-            private readonly Semaphore _semaphore;
-#endif
 
             public _(IDisposable subscription)
                 : base(subscription)
             {
                 _gate = new object();
-
-#if !NO_CDS
                 _semaphore = new SemaphoreSlim(0, 1);
-#else
-                _semaphore = new Semaphore(0, 1);
-#endif
             }
 
             private bool _notificationAvailable;
@@ -103,11 +93,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 var value = default(TSource);
                 var error = default(Exception);
 
-#if !NO_CDS
                 _semaphore.Wait();
-#else
-                _semaphore.WaitOne();
-#endif
 
                 lock (_gate)
                 {

+ 0 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/Next.cs

@@ -25,23 +25,13 @@ namespace System.Reactive.Linq.ObservableImpl
         class _ : PushToPullSink<TSource, TSource>
         {
             private readonly object _gate;
-
-#if !NO_CDS
             private readonly SemaphoreSlim _semaphore;
-#else
-            private readonly Semaphore _semaphore;
-#endif
 
             public _(IDisposable subscription)
                 : base(subscription)
             {
                 _gate = new object();
-
-#if !NO_CDS
                 _semaphore = new SemaphoreSlim(0, 1);
-#else
-                _semaphore = new Semaphore(0, 1);
-#endif
             }
 
             private bool _waiting;
@@ -117,11 +107,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 if (!done)
                 {
-#if !NO_CDS
                     _semaphore.Wait();
-#else
-                    _semaphore.WaitOne();
-#endif
                 }
 
                 //

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Blocking.cs

@@ -256,7 +256,7 @@ namespace System.Reactive.Linq
 
         public virtual IEnumerator<TSource> GetEnumerator<TSource>(IObservable<TSource> source)
         {
-#if !NO_PERF && !NO_CDS
+#if !NO_PERF
             var e = new GetEnumerator<TSource>();
             return e.Run(source);
 #else
@@ -459,7 +459,7 @@ namespace System.Reactive.Linq
 
         #region |> Helpers <|
 
-#if NO_CDS || NO_PERF
+#if NO_PERF
         private static IEnumerator<TResult> PushToPull<TSource, TResult>(IObservable<TSource> source, Action<Notification<TSource>> push, Func<Notification<TResult>> pull)
         {
             var subscription = new SingleAssignmentDisposable();

+ 0 - 4
Rx.NET/Source/tests/Tests.System.Reactive/Stress/Core/Schedulers/EventLoop.cs

@@ -58,11 +58,7 @@ namespace ReactiveTests.Stress.Schedulers
 
         private static int CurrentCount(this EventLoopScheduler scheduler)
         {
-#if !NO_CDS
             return ((SemaphoreSlim)semaphore.GetValue(scheduler)).CurrentCount;
-#else
-            return 0;
-#endif
         }
     }
 }

+ 1 - 3
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs

@@ -280,7 +280,6 @@ namespace ReactiveTests.Tests
         }
 #endif
 
-#if !NO_CDS
         [Fact]
         public void EventLoop_Immediate()
         {
@@ -361,7 +360,6 @@ namespace ReactiveTests.Tests
                 }
             }
         }
-#endif
 
         [Fact]
         public void EventLoop_Periodic()
@@ -393,7 +391,7 @@ namespace ReactiveTests.Tests
         }
 #endif
 
-#if !NO_CDS && DESKTOPCLR
+#if DESKTOPCLR
         [Fact]
         public void EventLoop_CorrectWorkStealing()
         {

+ 0 - 2
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/ThreadPoolSchedulerTest.cs

@@ -42,7 +42,6 @@ namespace ReactiveTests.Tests
             evt.WaitOne();
         }
 
-#if !NO_CDS
         [Fact]
         public void ProperRooting_NoGC_SingleShot()
         {
@@ -97,7 +96,6 @@ namespace ReactiveTests.Tests
 
             cts.Cancel();
         }
-#endif
 
 #if !SILVERLIGHT
         [Fact]

+ 3 - 3
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/ObservableConcurrencyTest.cs

@@ -581,7 +581,7 @@ namespace ReactiveTests.Tests
                 OnCompleted<int>(531)
             );
 
-#if !NO_PERF && !NO_CDS
+#if !NO_PERF
             // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
             xs.Subscriptions.AssertEqual(
                 Subscribe(200, 531)
@@ -626,7 +626,7 @@ namespace ReactiveTests.Tests
                 OnError<int>(531, ex)
             );
 
-#if !NO_PERF && !NO_CDS
+#if !NO_PERF
             // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
             xs.Subscriptions.AssertEqual(
                 Subscribe(200, 531)
@@ -754,7 +754,7 @@ namespace ReactiveTests.Tests
             }
         }
 
-#if !NO_PERF && !NO_CDS
+#if !NO_PERF
         [Fact]
         public void ObserveOn_LongRunning_Simple()
         {

+ 0 - 2
Rx.NET/Source/tests/Tests.System.Reactive/Tests/ObserverTest.cs

@@ -643,7 +643,6 @@ namespace ReactiveTests.Tests
             }
         }
 
-#if !NO_CDS
         [Fact]
         public void Observer_Synchronize_OnCompleted()
         {
@@ -720,7 +719,6 @@ namespace ReactiveTests.Tests
 
             Assert.Equal(n, N * M);
         }
-#endif
 
         [Fact]
         public void NotifyOn_Null()