1
0
Эх сурвалжийг харах

Merge pull request #1364 from dotnet/dev/bartde/rx_nullable_part19

More #nullable for query operators.
Bart J.F. De Smet 5 жил өмнө
parent
commit
afa97c840a

+ 3 - 2
Rx.NET/Source/src/System.Reactive/Internal/HalfSerializer.cs

@@ -29,14 +29,15 @@ namespace System.Reactive
         /// <param name="item">The item to signal.</param>
         /// <param name="wip">Indicates there is an emission going on currently.</param>
         /// <param name="error">The field containing an error or terminal indicator.</param>
-        public static void ForwardOnNext<T>(ISink<T> sink, T item, ref int wip, ref Exception error)
+        public static void ForwardOnNext<T>(ISink<T> sink, T item, ref int wip, ref Exception? error)
         {
             if (Interlocked.CompareExchange(ref wip, 1, 0) == 0)
             {
                 sink.ForwardOnNext(item);
+
                 if (Interlocked.Decrement(ref wip) != 0)
                 {
-                    var ex = error;
+                    var ex = error!; // NB: A concurrent OnError or OnCompleted will either set Terminated or the original exception, so never null here.
                     if (ex != ExceptionHelper.Terminated)
                     {
                         error = ExceptionHelper.Terminated;

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/DelaySubscription.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -58,12 +56,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public void Run(IObservable<TSource> source, IScheduler scheduler, DateTimeOffset dueTime)
             {
-                SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this)));
+                SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, static tuple => tuple.source.SubscribeSafe(tuple.@this)));
             }
 
             public void Run(IObservable<TSource> source, IScheduler scheduler, TimeSpan dueTime)
             {
-                SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, tuple => tuple.source.SubscribeSafe(tuple.@this)));
+                SetUpstream(scheduler.ScheduleAction((@this: this, source), dueTime, static tuple => tuple.source.SubscribeSafe(tuple.@this)));
             }
         }
     }

+ 1 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Threading;
 
@@ -27,7 +25,7 @@ namespace System.Reactive.Linq.ObservableImpl
         internal sealed class _ : IdentitySink<TSource>
         {
             private readonly Action _finallyAction;
-            private IDisposable _sourceDisposable;
+            private IDisposable? _sourceDisposable;
 
             public _(Action finallyAction, IObserver<TSource> observer)
                 : base(observer)

+ 5 - 7
Rx.NET/Source/src/System.Reactive/Linq/Observable/GetEnumerator.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Diagnostics;
@@ -15,11 +13,11 @@ namespace System.Reactive.Linq.ObservableImpl
     internal sealed class GetEnumerator<TSource> : IEnumerator<TSource>, IObserver<TSource>
     {
         private readonly ConcurrentQueue<TSource> _queue;
-        private TSource _current;
-        private Exception _error;
+        private TSource? _current;
+        private Exception? _error;
         private bool _done;
         private bool _disposed;
-        private IDisposable _subscription;
+        private IDisposable? _subscription;
 
         private readonly SemaphoreSlim _gate;
 
@@ -80,9 +78,9 @@ namespace System.Reactive.Linq.ObservableImpl
             return false;
         }
 
-        public TSource Current => _current;
+        public TSource Current => _current!; // NB: Only called after MoveNext returns true and assigns a value.
 
-        object Collections.IEnumerator.Current => _current;
+        object Collections.IEnumerator.Current => _current!;
 
         public void Dispose()
         {

+ 1 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/OnErrorResumeNext.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Generic;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -28,7 +26,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            protected override IEnumerable<IObservable<TSource>> Extract(IObservable<TSource> source)
+            protected override IEnumerable<IObservable<TSource>>? Extract(IObservable<TSource> source)
             {
                 if (source is OnErrorResumeNext<TSource> oern)
                 {

+ 4 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
@@ -30,7 +28,7 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly int _end;
             private int _index;
-            private IDisposable _task;
+            private IDisposable? _task;
 
             public RangeSink(int start, int count, IObserver<int> observer)
                 : base(observer)
@@ -57,6 +55,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private IDisposable LoopRec(IScheduler scheduler)
             {
                 var idx = _index;
+
                 if (idx != _end)
                 {
                     _index = idx + 1;
@@ -68,6 +67,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     ForwardOnCompleted();
                 }
+
                 return Disposable.Empty;
             }
         }
@@ -111,6 +111,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 var idx = _index;
                 var end = _end;
+
                 while (!cancel.IsDisposed && idx != end)
                 {
                     ForwardOnNext(idx++);

+ 6 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Repeat.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
@@ -30,7 +28,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly TResult _value;
 
-                private IDisposable _task;
+                private IDisposable? _task;
 
                 public _(TResult value, IObserver<TResult> observer)
                     : base(observer)
@@ -47,6 +45,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 protected override void Dispose(bool disposing)
                 {
                     base.Dispose(disposing);
+
                     if (disposing)
                     {
                         Disposable.Dispose(ref _task);
@@ -131,7 +130,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private int _remaining;
 
-                private IDisposable _task;
+                private IDisposable? _task;
 
                 public _(TResult value, int repeatCount, IObserver<TResult> observer)
                     : base(observer)
@@ -149,6 +148,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 protected override void Dispose(bool disposing)
                 {
                     base.Dispose(disposing);
+
                     if (disposing)
                     {
                         Disposable.Dispose(ref _task);
@@ -173,6 +173,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         var next = scheduler.Schedule(this, static (innerScheduler, @this) => @this.LoopRec(innerScheduler));
                         Disposable.TrySetMultiple(ref _task, next);
                     }
+
                     return Disposable.Empty;
                 }
             }
@@ -216,6 +217,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     var value = _value;
                     var n = _remaining;
+
                     while (n > 0 && !cancel.IsDisposed)
                     {
                         ForwardOnNext(value);

+ 19 - 24
Rx.NET/Source/src/System.Reactive/Linq/Observable/RepeatWhen.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
 using System.Threading;
@@ -59,21 +57,22 @@ namespace System.Reactive.Linq.ObservableImpl
 
         private sealed class MainObserver : Sink<T>, IObserver<T>
         {
-            private readonly IObserver<Exception> _errorSignal;
+            private readonly IObservable<T> _source;
+            private readonly IObserver<object> _completeSignal;
 
             internal readonly HandlerObserver HandlerConsumer;
-            private readonly IObservable<T> _source;
-            private IDisposable _upstream;
 
-            internal IDisposable HandlerUpstream;
+            internal IDisposable? HandlerUpstream;
+
+            private IDisposable? _upstream;
             private int _trampoline;
             private int _halfSerializer;
-            private Exception _error;
+            private Exception? _error;
 
-            internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream)
+            internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<object> completeSignal) : base(downstream)
             {
                 _source = source;
-                _errorSignal = errorSignal;
+                _completeSignal = completeSignal;
                 HandlerConsumer = new HandlerObserver(this);
             }
 
@@ -84,6 +83,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     Disposable.Dispose(ref _upstream);
                     Disposable.Dispose(ref HandlerUpstream);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -91,9 +91,14 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 if (Disposable.TrySetSerial(ref _upstream, null))
                 {
-                    _errorSignal.OnNext(null);
-                }
+                    //
+                    // NB: Unfortunately this thing slipped in using `object` rather than `Unit`, which is our type used to represent nothing,
+                    //     so we have to stick with it and just let a `null` go in here. Users are supposed to ignore the elements produced,
+                    //     which `Unit` is making obvious since there's only one value. However, we're stuck here for compat reasons.
+                    //
 
+                    _completeSignal.OnNext(null!);
+                }
             }
 
             public void OnError(Exception error)
@@ -143,22 +148,12 @@ namespace System.Reactive.Linq.ObservableImpl
                     _main = main;
                 }
 
-                public void OnCompleted()
-                {
-                    _main.HandlerComplete();
-                }
+                public void OnCompleted() => _main.HandlerComplete();
 
-                public void OnError(Exception error)
-                {
-                    _main.HandlerError(error);
-                }
+                public void OnError(Exception error) => _main.HandlerError(error);
 
-                public void OnNext(U value)
-                {
-                    _main.HandlerNext();
-                }
+                public void OnNext(U value) => _main.HandlerNext();
             }
         }
-
     }
 }

+ 11 - 9
Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Collections.Concurrent;
 using System.Reactive.Disposables;
 using System.Reactive.Subjects;
@@ -36,6 +34,7 @@ namespace System.Reactive.Linq.ObservableImpl
             try
             {
                 redo = _handler(errorSignals);
+
                 if (redo == null)
                 {
                     throw new NullReferenceException("The handler returned a null IObservable");
@@ -59,15 +58,15 @@ namespace System.Reactive.Linq.ObservableImpl
 
         private sealed class MainObserver : Sink<T>, IObserver<T>
         {
+            private readonly IObservable<T> _source;
             private readonly IObserver<Exception> _errorSignal;
 
             internal readonly HandlerObserver HandlerConsumer;
-            private readonly IObservable<T> _source;
-            private IDisposable _upstream;
-            internal IDisposable HandlerUpstream;
+            private IDisposable? _upstream;
+            internal IDisposable? HandlerUpstream;
             private int _trampoline;
             private int _halfSerializer;
-            private Exception _error;
+            private Exception? _error;
 
             internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream)
             {
@@ -83,6 +82,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     Disposable.Dispose(ref _upstream);
                     Disposable.Dispose(ref HandlerUpstream);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -161,12 +161,14 @@ namespace System.Reactive.Linq.ObservableImpl
 
     internal sealed class RedoSerializedObserver<X> : IObserver<X>
     {
-        private readonly IObserver<X> _downstream;
-        private int _wip;
-        private Exception _terminalException;
         private static readonly Exception SignaledIndicator = new Exception();
+
+        private readonly IObserver<X> _downstream;
         private readonly ConcurrentQueue<X> _queue;
 
+        private int _wip;
+        private Exception? _terminalException;
+
         internal RedoSerializedObserver(IObserver<X> downstream)
         {
             _downstream = downstream;

+ 5 - 6
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
@@ -26,10 +24,10 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
-            private IDisposable _otherDisposable;
+            private IDisposable? _otherDisposable;
             private bool _forward;
             private int _halfSerializer;
-            private Exception _error;
+            private Exception? _error;
 
             public _(IObserver<TSource> observer)
                 : base(observer)
@@ -166,11 +164,11 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IDisposable _task;
+            private IDisposable? _task;
 
             public void Run(SkipUntil<TSource> parent)
             {
-                Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._startTime, state => state.Tick()));
+                Disposable.SetSingle(ref _task, parent._scheduler.ScheduleAction(this, parent._startTime, static state => state.Tick()));
                 Run(parent._source);
             }
 
@@ -180,6 +178,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     Disposable.Dispose(ref _task);
                 }
+
                 base.Dispose(disposing);
             }
 

+ 5 - 6
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
@@ -26,9 +24,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
-            private IDisposable _otherDisposable;
+            private IDisposable? _otherDisposable;
             private int _halfSerializer;
-            private Exception _error;
+            private Exception? _error;
 
             public _(IObserver<TSource> observer)
                 : base(observer)
@@ -136,9 +134,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
-            private IDisposable _timerDisposable;
+            private IDisposable? _timerDisposable;
             private int _wip;
-            private Exception _error;
+            private Exception? _error;
 
             public _(IObserver<TSource> observer)
                 : base(observer)
@@ -157,6 +155,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     Disposable.Dispose(ref _timerDisposable);
                 }
+
                 base.Dispose(disposing);
             }
 

+ 5 - 13
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntilPredicate.cs

@@ -2,9 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
-
 namespace System.Reactive.Linq.ObservableImpl
 {
     /// <summary>
@@ -31,21 +28,15 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly Func<TSource, bool> _stopPredicate;
 
-            public TakeUntilPredicateObserver(IObserver<TSource> downstream,
-                Func<TSource, bool> predicate) : base(downstream)
+            public TakeUntilPredicateObserver(IObserver<TSource> downstream, Func<TSource, bool> predicate)
+                : base(downstream)
             {
                 _stopPredicate = predicate;
             }
 
-            public override void OnCompleted()
-            {
-                ForwardOnCompleted();
-            }
+            public override void OnCompleted() => ForwardOnCompleted();
 
-            public override void OnError(Exception error)
-            {
-                ForwardOnError(error);
-            }
+            public override void OnError(Exception error) => ForwardOnError(error);
 
             public override void OnNext(TSource value)
             {
@@ -61,6 +52,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     ForwardOnError(ex);
                     return;
                 }
+
                 if (shouldStop)
                 {
                     ForwardOnCompleted();

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/TimeInterval.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -30,7 +28,7 @@ namespace System.Reactive.Linq.ObservableImpl
             {
             }
 
-            private IStopwatch _watch;
+            private IStopwatch? _watch;
             private TimeSpan _last;
 
             public void Run(TimeInterval<TSource> parent)
@@ -43,7 +41,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public override void OnNext(TSource value)
             {
-                var now = _watch.Elapsed;
+                var now = _watch!.Elapsed; // NB: Watch is assigned during Run.
                 var span = now.Subtract(_last);
                 _last = now;
                 ForwardOnNext(new Reactive.TimeInterval<TSource>(value, span));

+ 15 - 11
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timeout.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Threading;
@@ -36,10 +34,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly TimeSpan _dueTime;
                 private readonly IObservable<TSource> _other;
                 private readonly IScheduler _scheduler;
+
                 private long _index;
-                private IDisposable _mainDisposable;
-                private IDisposable _otherDisposable;
-                private IDisposable _timerDisposable;
+                private IDisposable? _mainDisposable;
+                private IDisposable? _otherDisposable;
+                private IDisposable? _timerDisposable;
 
                 public _(Relative parent, IObserver<TSource> observer)
                     : base(observer)
@@ -64,6 +63,7 @@ namespace System.Reactive.Linq.ObservableImpl
                         Disposable.Dispose(ref _otherDisposable);
                         Disposable.Dispose(ref _timerDisposable);
                     }
+
                     base.Dispose(disposing);
                 }
 
@@ -71,8 +71,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     if (Disposable.TrySetMultiple(ref _timerDisposable, null))
                     {
-
-                        var d = _scheduler.ScheduleAction((idx, instance: this), _dueTime, state => { state.instance.Timeout(state.idx); });
+                        var d = _scheduler.ScheduleAction((idx, instance: this), _dueTime, static state => { state.instance.Timeout(state.idx); });
 
                         Disposable.TrySetMultiple(ref _timerDisposable, d);
                     }
@@ -150,7 +149,8 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : IdentitySink<TSource>
             {
                 private readonly IObservable<TSource> _other;
-                private IDisposable _serialDisposable;
+
+                private IDisposable? _serialDisposable;
                 private int _wip;
 
                 public _(IObservable<TSource> other, IObserver<TSource> observer)
@@ -172,6 +172,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     {
                         Disposable.Dispose(ref _serialDisposable);
                     }
+
                     base.Dispose(disposing);
                 }
 
@@ -237,8 +238,9 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly Func<TSource, IObservable<TTimeout>> _timeoutSelector;
             private readonly IObservable<TSource> _other;
-            private IDisposable _sourceDisposable;
-            private IDisposable _timerDisposable;
+
+            private IDisposable? _sourceDisposable;
+            private IDisposable? _timerDisposable;
             private long _index;
 
             public _(Timeout<TSource, TTimeout> parent, IObserver<TSource> observer)
@@ -248,7 +250,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 _other = parent._other;
             }
 
-
             public void Run(Timeout<TSource, TTimeout> parent)
             {
                 SetTimer(parent._firstTimeout, 0L);
@@ -263,6 +264,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     Disposable.Dispose(ref _sourceDisposable);
                     Disposable.Dispose(ref _timerDisposable);
                 }
+
                 base.Dispose(disposing);
             }
 
@@ -328,12 +330,14 @@ namespace System.Reactive.Linq.ObservableImpl
                     ForwardOnError(error);
                     return true;
                 }
+
                 return false;
             }
 
             private void SetTimer(IObservable<TTimeout> timeout, long idx)
             {
                 var timeoutObserver = new TimeoutObserver(this, idx);
+
                 if (Disposable.TrySetSerial(ref _timerDisposable, timeoutObserver))
                 {
                     var d = timeout.Subscribe(timeoutObserver);

+ 2 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Timer.cs

@@ -2,8 +2,6 @@
 // The .NET Foundation licenses this file to you under the MIT License.
 // See the LICENSE file in the project root for more information. 
 
-#nullable disable
-
 using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 using System.Threading;
@@ -171,7 +169,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
 
                 private int _pendingTickCount;
-                private IDisposable _periodic;
+                private IDisposable? _periodic;
 
                 private IDisposable InvokeStart(IScheduler self)
                 {
@@ -285,7 +283,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     }
                     catch (Exception e)
                     {
-                        _periodic.Dispose();
+                        _periodic!.Dispose(); // NB: _periodic is assigned before this runs.
                         e.Throw();
                     }
 

+ 12 - 10
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Concurrency/EventLoopSchedulerTest.cs

@@ -20,6 +20,8 @@ namespace ReactiveTests.Tests
 
     public class EventLoopSchedulerTest
     {
+        private static readonly TimeSpan MaxWaitTime = TimeSpan.FromSeconds(10);
+
         [Fact]
         public void EventLoop_ArgumentChecking()
         {
@@ -66,7 +68,7 @@ namespace ReactiveTests.Tests
                 ran = true;
                 gate.Release();
             });
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             Assert.True(ran);
         }
 
@@ -82,7 +84,7 @@ namespace ReactiveTests.Tests
                 id = Thread.CurrentThread.ManagedThreadId;
                 gate.Release();
             });
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             Assert.NotEqual(Thread.CurrentThread.ManagedThreadId, id);
         }
 #endif
@@ -99,7 +101,7 @@ namespace ReactiveTests.Tests
                 results.Add(1);
                 gate.Release();
             });
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             results.AssertEqual(0, 1);
         }
 
@@ -165,7 +167,7 @@ namespace ReactiveTests.Tests
                             gate.Release();
                         });
 
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             results.AssertEqual(1, 0);
         }
 
@@ -182,7 +184,7 @@ namespace ReactiveTests.Tests
                 gate.Release();
             });
 
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             results.AssertEqual(1, 0);
         }
 
@@ -204,7 +206,7 @@ namespace ReactiveTests.Tests
                             });
                         });
 
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             results.AssertEqual(0, 1, 2);
         }
 
@@ -227,7 +229,7 @@ namespace ReactiveTests.Tests
                 });
             });
 
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             results.AssertEqual(0, 4, 1, 2);
         }
 
@@ -242,7 +244,7 @@ namespace ReactiveTests.Tests
                 ran = true;
                 gate.Release();
             }));
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             Assert.True(ran);
         }
 
@@ -260,7 +262,7 @@ namespace ReactiveTests.Tests
                 sw.Stop();
                 gate.Release();
             });
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             Assert.True(ran, "ran");
             Assert.True(sw.ElapsedMilliseconds > 180, "due " + sw.ElapsedMilliseconds);
         }
@@ -286,7 +288,7 @@ namespace ReactiveTests.Tests
                 });
             });
 
-            Assert.True(gate.WaitOne(TimeSpan.FromSeconds(2)));
+            Assert.True(gate.WaitOne(MaxWaitTime), "Timeout!");
             Assert.True(ran, "ran");
             Assert.True(sw.ElapsedMilliseconds > 380, "due " + sw.ElapsedMilliseconds);
         }

+ 4 - 5
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/ObserveOnTest.cs

@@ -22,12 +22,10 @@ using System.Windows.Threading;
 using System.Windows.Forms;
 #endif
 
-
 namespace ReactiveTests.Tests
 {
     public class ObserveOnTest : TestBase
     {
-
         #region + TestBase +
 
         [Fact]
@@ -221,6 +219,7 @@ namespace ReactiveTests.Tests
 
     public class ObserveOnReactiveTest : ReactiveTest
     {
+        private static TimeSpan MaxWaitTime = TimeSpan.FromSeconds(10);
 
         [Fact]
         public void ObserveOn_Scheduler_ArgumentChecking()
@@ -657,7 +656,7 @@ namespace ReactiveTests.Tests
             Observable.Range(1, N).ObserveOn(_scheduler1)
                 .Subscribe(v => { }, () => cde.Signal());
 
-            Assert.True(cde.Wait(5000), "Timeout!");
+            Assert.True(cde.Wait(MaxWaitTime), "Timeout!");
         }
 
         [Fact]
@@ -679,7 +678,7 @@ namespace ReactiveTests.Tests
                     () => cde.Signal()
                 );
 
-            Assert.True(cde.Wait(5000), "Timeout!");
+            Assert.True(cde.Wait(MaxWaitTime), "Timeout!");
 
             Assert.Equal(1, threads.Count);
         }
@@ -703,7 +702,7 @@ namespace ReactiveTests.Tests
                     () => cde.Signal()
                 );
 
-            Assert.True(cde.Wait(5000), "Timeout!");
+            Assert.True(cde.Wait(MaxWaitTime), "Timeout!");
 
             Assert.True(threads.Count >= 1);
         }