|
|
@@ -381,9 +381,8 @@ namespace System.Reactive
|
|
|
/// techniques to signal events to the downstream.
|
|
|
/// </summary>
|
|
|
/// <typeparam name="T">The element type of the sequence.</typeparam>
|
|
|
- internal sealed class ObserveOnObserverNew<T> : IObserver<T>, IDisposable
|
|
|
+ internal sealed class ObserveOnObserverNew<T> : IdentitySink<T>
|
|
|
{
|
|
|
- private readonly IObserver<T> _downstream;
|
|
|
private readonly IScheduler _scheduler;
|
|
|
|
|
|
/// <summary>
|
|
|
@@ -392,7 +391,6 @@ namespace System.Reactive
|
|
|
/// </summary>
|
|
|
private readonly ISchedulerLongRunning _longRunning;
|
|
|
private readonly ConcurrentQueue<T> _queue;
|
|
|
- private IDisposable _run;
|
|
|
|
|
|
/// <summary>
|
|
|
/// The current task representing a running drain operation.
|
|
|
@@ -421,54 +419,55 @@ namespace System.Reactive
|
|
|
/// </summary>
|
|
|
private bool _disposed;
|
|
|
|
|
|
- public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream)
|
|
|
+ public ObserveOnObserverNew(IScheduler scheduler, IObserver<T> downstream) : base(downstream)
|
|
|
{
|
|
|
- _downstream = downstream;
|
|
|
_scheduler = scheduler;
|
|
|
_longRunning = scheduler.AsLongRunning();
|
|
|
_queue = new ConcurrentQueue<T>();
|
|
|
}
|
|
|
|
|
|
- public void Run(IObservable<T> source)
|
|
|
- {
|
|
|
- Disposable.SetSingle(ref _run, source.SubscribeSafe(this));
|
|
|
- }
|
|
|
-
|
|
|
- public void Dispose()
|
|
|
+ protected override void Dispose(bool disposing)
|
|
|
{
|
|
|
Volatile.Write(ref _disposed, true);
|
|
|
- Disposable.TryDispose(ref _task);
|
|
|
- Disposable.TryDispose(ref _run);
|
|
|
- Clear();
|
|
|
+
|
|
|
+ base.Dispose(disposing);
|
|
|
+ if (disposing)
|
|
|
+ {
|
|
|
+ Disposable.TryDispose(ref _task);
|
|
|
+ Clear(_queue);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/// <summary>
|
|
|
/// Remove remaining elements from the queue upon
|
|
|
/// cancellation or failure.
|
|
|
/// </summary>
|
|
|
- private void Clear()
|
|
|
+ /// <param name="q">The queue to use. The argument ensures that the
|
|
|
+ /// _queue field is not re-read from memory unnecessarily
|
|
|
+ /// due to the memory barriers inside TryDequeue mandating it
|
|
|
+ /// despite the field is read-only.</param>
|
|
|
+ private void Clear(ConcurrentQueue<T> q)
|
|
|
{
|
|
|
- var q = _queue;
|
|
|
while (q.TryDequeue(out var _))
|
|
|
{
|
|
|
;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void OnCompleted()
|
|
|
+ public override void OnCompleted()
|
|
|
{
|
|
|
Volatile.Write(ref _done, true);
|
|
|
Schedule();
|
|
|
}
|
|
|
|
|
|
- public void OnError(Exception error)
|
|
|
+ public override void OnError(Exception error)
|
|
|
{
|
|
|
_error = error;
|
|
|
Volatile.Write(ref _done, true);
|
|
|
Schedule();
|
|
|
}
|
|
|
|
|
|
- public void OnNext(T value)
|
|
|
+ public override void OnNext(T value)
|
|
|
{
|
|
|
_queue.Enqueue(value);
|
|
|
Schedule();
|
|
|
@@ -489,11 +488,11 @@ namespace System.Reactive
|
|
|
var longRunning = _longRunning;
|
|
|
if (longRunning != null)
|
|
|
{
|
|
|
- newTask.Disposable = longRunning.ScheduleLongRunning(this, DRAIN_LONG_RUNNING);
|
|
|
+ newTask.Disposable = longRunning.ScheduleLongRunning(this, DrainLongRunningAction);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
- newTask.Disposable = _scheduler.Schedule(this, DRAIN_SHORT_RUNNING);
|
|
|
+ newTask.Disposable = _scheduler.Schedule(this, DrainShortRunningFunc);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -503,7 +502,7 @@ namespace System.Reactive
|
|
|
// is of a multi-consumer type.
|
|
|
if (Volatile.Read(ref _disposed))
|
|
|
{
|
|
|
- Clear();
|
|
|
+ Clear(_queue);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -513,7 +512,7 @@ namespace System.Reactive
|
|
|
/// Avoids creating a delegate that captures <code>this</code>
|
|
|
/// whenever the signals have to be drained.
|
|
|
/// </summary>
|
|
|
- private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DRAIN_LONG_RUNNING =
|
|
|
+ private static readonly Action<ObserveOnObserverNew<T>, ICancelable> DrainLongRunningAction =
|
|
|
(self, cancel) => self.DrainLongRunning();
|
|
|
|
|
|
/// <summary>
|
|
|
@@ -521,7 +520,7 @@ namespace System.Reactive
|
|
|
/// Avoids creating a delegate that captures <code>this</code>
|
|
|
/// whenever the signals have to be drained.
|
|
|
/// </summary>
|
|
|
- private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DRAIN_SHORT_RUNNING =
|
|
|
+ private static readonly Func<IScheduler, ObserveOnObserverNew<T>, IDisposable> DrainShortRunningFunc =
|
|
|
(scheduler, self) => self.DrainShortRunning(scheduler);
|
|
|
|
|
|
/// <summary>
|
|
|
@@ -532,13 +531,13 @@ namespace System.Reactive
|
|
|
/// <returns>The IDisposable of the recursively scheduled task or an empty disposable.</returns>
|
|
|
private IDisposable DrainShortRunning(IScheduler recursiveScheduler)
|
|
|
{
|
|
|
- DrainStep(_queue, _downstream, false);
|
|
|
+ DrainStep(_queue, false);
|
|
|
|
|
|
if (Interlocked.Decrement(ref _wip) != 0)
|
|
|
{
|
|
|
// Don't return the disposable of Schedule() because that may chain together
|
|
|
// a long string of ScheduledItems causing StackOverflowException upon Dispose()
|
|
|
- var d = recursiveScheduler.Schedule(this, DRAIN_SHORT_RUNNING);
|
|
|
+ var d = recursiveScheduler.Schedule(this, DrainShortRunningFunc);
|
|
|
Disposable.TrySetMultiple(ref _task, d);
|
|
|
}
|
|
|
return Disposable.Empty;
|
|
|
@@ -550,18 +549,22 @@ namespace System.Reactive
|
|
|
/// empty queue, issuing the appropriate signals to the
|
|
|
/// given downstream.
|
|
|
/// </summary>
|
|
|
- /// <param name="q">The queue to use.</param>
|
|
|
- /// <param name="downstream">The intended consumer of the events.</param>
|
|
|
+ /// <param name="q">The queue to use. The argument ensures that the
|
|
|
+ /// _queue field is not re-read from memory due to the memory barriers
|
|
|
+ /// inside TryDequeue mandating it despite the field is read-only.
|
|
|
+ /// In addition, the DrainStep is invoked from the DrainLongRunning's loop
|
|
|
+ /// so reading _queue inside this method would still incur the same barrier
|
|
|
+ /// overhead otherwise.</param>
|
|
|
/// <param name="delayError">Should the errors be delayed until all
|
|
|
/// queued items have been emitted to the downstream?</param>
|
|
|
/// <returns>True if the drain loop should stop.</returns>
|
|
|
- private bool DrainStep(ConcurrentQueue<T> q, IObserver<T> downstream, bool delayError)
|
|
|
+ private bool DrainStep(ConcurrentQueue<T> q, bool delayError)
|
|
|
{
|
|
|
// Check if the operator has been disposed
|
|
|
if (Volatile.Read(ref _disposed))
|
|
|
{
|
|
|
// cleanup residue items in the queue
|
|
|
- Clear();
|
|
|
+ Clear(q);
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
@@ -577,13 +580,13 @@ namespace System.Reactive
|
|
|
if (ex != null)
|
|
|
{
|
|
|
Volatile.Write(ref _disposed, true);
|
|
|
- downstream.OnError(ex);
|
|
|
+ ForwardOnError(ex);
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// get the next item from the queue if any
|
|
|
- var empty = !_queue.TryDequeue(out var v);
|
|
|
+ var empty = !q.TryDequeue(out var v);
|
|
|
|
|
|
// the upstream called OnComplete and the queue is empty
|
|
|
// that means we are done, no further signals can happen
|
|
|
@@ -596,12 +599,12 @@ namespace System.Reactive
|
|
|
// if not null, there was an OnError call
|
|
|
if (ex != null)
|
|
|
{
|
|
|
- downstream.OnError(ex);
|
|
|
+ ForwardOnError(ex);
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
// otherwise, complete normally
|
|
|
- downstream.OnCompleted();
|
|
|
+ ForwardOnCompleted();
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
@@ -612,7 +615,7 @@ namespace System.Reactive
|
|
|
return true;
|
|
|
}
|
|
|
// emit the item
|
|
|
- downstream.OnNext(v);
|
|
|
+ ForwardOnNext(v);
|
|
|
|
|
|
// keep looping
|
|
|
return false;
|
|
|
@@ -631,7 +634,6 @@ namespace System.Reactive
|
|
|
// that would force the re-read of these constant values
|
|
|
// from memory, regardless of readonly, afaik
|
|
|
var q = _queue;
|
|
|
- var downstream = _downstream;
|
|
|
|
|
|
for (; ; )
|
|
|
{
|
|
|
@@ -640,7 +642,7 @@ namespace System.Reactive
|
|
|
// delayError: true - because of
|
|
|
// ObserveOn_LongRunning_HoldUpDuringDispatchAndFail
|
|
|
// expects something that almost looks like full delayError
|
|
|
- if (DrainStep(q, downstream, true))
|
|
|
+ if (DrainStep(q, true))
|
|
|
{
|
|
|
break;
|
|
|
}
|