|
@@ -340,14 +340,8 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
private IStopwatch _watch;
|
|
|
|
|
|
private object _gate;
|
|
|
-#if !NO_CDS
|
|
|
private SemaphoreSlim _evt;
|
|
|
private CancellationTokenSource _stop;
|
|
|
-#else
|
|
|
- private Semaphore _evt;
|
|
|
- private bool _stopped;
|
|
|
- private ManualResetEvent _stop;
|
|
|
-#endif
|
|
|
private Queue<System.Reactive.TimeInterval<TSource>> _queue;
|
|
|
private bool _hasCompleted;
|
|
|
private TimeSpan _completeAt;
|
|
@@ -359,11 +353,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_cancelable = new SerialDisposable();
|
|
|
|
|
|
_gate = new object();
|
|
|
-#if !NO_CDS
|
|
|
_evt = new SemaphoreSlim(0);
|
|
|
-#else
|
|
|
- _evt = new Semaphore(0, int.MaxValue);
|
|
|
-#endif
|
|
|
_queue = new Queue<System.Reactive.TimeInterval<TSource>>();
|
|
|
_hasCompleted = false;
|
|
|
_completeAt = default(TimeSpan);
|
|
@@ -412,18 +402,8 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
private void ScheduleDrain()
|
|
|
{
|
|
|
-#if !NO_CDS
|
|
|
_stop = new CancellationTokenSource();
|
|
|
_cancelable.Disposable = Disposable.Create(() => _stop.Cancel());
|
|
|
-#else
|
|
|
- _stop = new ManualResetEvent(false);
|
|
|
- _cancelable.Disposable = Disposable.Create(() =>
|
|
|
- {
|
|
|
- _stopped = true;
|
|
|
- _stop.Set();
|
|
|
- _evt.Release();
|
|
|
- });
|
|
|
-#endif
|
|
|
|
|
|
_parent._scheduler.AsLongRunning().ScheduleLongRunning(DrainQueue);
|
|
|
}
|
|
@@ -474,7 +454,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
while (true)
|
|
|
{
|
|
|
-#if !NO_CDS
|
|
|
try
|
|
|
{
|
|
|
_evt.Wait(_stop.Token);
|
|
@@ -483,11 +462,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
-#else
|
|
|
- _evt.WaitOne();
|
|
|
- if (_stopped)
|
|
|
- return;
|
|
|
-#endif
|
|
|
|
|
|
var hasFailed = false;
|
|
|
var error = default(Exception);
|
|
@@ -539,7 +513,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
if (shouldWait)
|
|
|
{
|
|
|
-#if !NO_CDS
|
|
|
var timer = new ManualResetEventSlim();
|
|
|
_parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
|
|
|
|
|
@@ -551,12 +524,6 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
return;
|
|
|
}
|
|
|
-#else
|
|
|
- var timer = new ManualResetEvent(false);
|
|
|
- _parent._scheduler.Schedule(waitTime, () => { timer.Set(); });
|
|
|
- if (WaitHandle.WaitAny(new[] { timer, _stop }) == 1)
|
|
|
- return;
|
|
|
-#endif
|
|
|
}
|
|
|
|
|
|
if (hasValue)
|