|
@@ -2,8 +2,6 @@
|
|
|
// The .NET Foundation licenses this file to you under the MIT License.
|
|
|
// See the LICENSE file in the project root for more information.
|
|
|
|
|
|
-#nullable disable
|
|
|
-
|
|
|
using System.Reactive.Concurrency;
|
|
|
using System.Reactive.Disposables;
|
|
|
using System.Threading;
|
|
@@ -22,12 +20,12 @@ namespace System.Reactive
|
|
|
private const int Faulted = 9;
|
|
|
private readonly ConcurrentQueue<T> _queue = new ConcurrentQueue<T>();
|
|
|
private bool _failed;
|
|
|
- private Exception _error;
|
|
|
+ private Exception? _error;
|
|
|
private bool _completed;
|
|
|
private readonly IObserver<T> _observer;
|
|
|
private readonly IScheduler _scheduler;
|
|
|
- private readonly ISchedulerLongRunning _longRunning;
|
|
|
- private IDisposable _disposable;
|
|
|
+ private readonly ISchedulerLongRunning? _longRunning;
|
|
|
+ private IDisposable? _disposable;
|
|
|
|
|
|
public ScheduledObserver(IScheduler scheduler, IObserver<T> observer)
|
|
|
{
|
|
@@ -44,11 +42,11 @@ namespace System.Reactive
|
|
|
|
|
|
private sealed class SemaphoreSlimRelease : IDisposable
|
|
|
{
|
|
|
- private SemaphoreSlim _dispatcherEvent;
|
|
|
+ private volatile SemaphoreSlim? _dispatcherEvent;
|
|
|
|
|
|
public SemaphoreSlimRelease(SemaphoreSlim dispatcherEvent)
|
|
|
{
|
|
|
- Volatile.Write(ref _dispatcherEvent, dispatcherEvent);
|
|
|
+ _dispatcherEvent = dispatcherEvent;
|
|
|
}
|
|
|
|
|
|
public void Dispose()
|
|
@@ -58,9 +56,9 @@ namespace System.Reactive
|
|
|
}
|
|
|
|
|
|
private readonly object _dispatcherInitGate = new object();
|
|
|
- private readonly SemaphoreSlim _dispatcherEvent;
|
|
|
- private readonly IDisposable _dispatcherEventRelease;
|
|
|
- private IDisposable _dispatcherJob;
|
|
|
+ private readonly SemaphoreSlim? _dispatcherEvent;
|
|
|
+ private readonly IDisposable? _dispatcherEventRelease;
|
|
|
+ private IDisposable? _dispatcherJob;
|
|
|
|
|
|
private void EnsureDispatcher()
|
|
|
{
|
|
@@ -86,7 +84,7 @@ namespace System.Reactive
|
|
|
{
|
|
|
while (true)
|
|
|
{
|
|
|
- _dispatcherEvent.Wait();
|
|
|
+ _dispatcherEvent!.Wait(); // NB: If long-running, the event is set.
|
|
|
|
|
|
if (cancel.IsDisposed)
|
|
|
{
|
|
@@ -118,7 +116,7 @@ namespace System.Reactive
|
|
|
|
|
|
if (_failed)
|
|
|
{
|
|
|
- _observer.OnError(_error);
|
|
|
+ _observer.OnError(_error!);
|
|
|
Dispose();
|
|
|
return;
|
|
|
}
|
|
@@ -140,7 +138,7 @@ namespace System.Reactive
|
|
|
{
|
|
|
if (n > 0)
|
|
|
{
|
|
|
- _dispatcherEvent.Release(n);
|
|
|
+ _dispatcherEvent!.Release(n); // NB: If long-running, the event is set.
|
|
|
}
|
|
|
|
|
|
EnsureDispatcher();
|
|
@@ -200,11 +198,11 @@ namespace System.Reactive
|
|
|
|
|
|
if (isOwner)
|
|
|
{
|
|
|
- Disposable.TrySetSerial(ref _disposable, _scheduler.Schedule<object>(null, Run));
|
|
|
+ Disposable.TrySetSerial(ref _disposable, _scheduler.Schedule<object?>(null, Run));
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void Run(object state, Action<object> recurse)
|
|
|
+ private void Run(object? state, Action<object?> recurse)
|
|
|
{
|
|
|
T next;
|
|
|
|
|
@@ -232,7 +230,7 @@ namespace System.Reactive
|
|
|
}
|
|
|
|
|
|
Interlocked.Exchange(ref _state, Stopped);
|
|
|
- _observer.OnError(_error);
|
|
|
+ _observer.OnError(_error!);
|
|
|
Dispose();
|
|
|
return;
|
|
|
}
|
|
@@ -326,7 +324,7 @@ namespace System.Reactive
|
|
|
|
|
|
internal sealed class ObserveOnObserver<T> : ScheduledObserver<T>
|
|
|
{
|
|
|
- private IDisposable _run;
|
|
|
+ private IDisposable? _run;
|
|
|
|
|
|
public ObserveOnObserver(IScheduler scheduler, IObserver<T> observer)
|
|
|
: base(scheduler, observer)
|
|
@@ -388,7 +386,7 @@ namespace System.Reactive
|
|
|
/// <summary>
|
|
|
/// The current task representing a running drain operation.
|
|
|
/// </summary>
|
|
|
- private IDisposable _task;
|
|
|
+ private IDisposable? _task;
|
|
|
|
|
|
/// <summary>
|
|
|
/// Indicates the work-in-progress state of this operator,
|
|
@@ -405,7 +403,7 @@ namespace System.Reactive
|
|
|
/// If <see cref="_done"/> is true and this is non-null, the upstream
|
|
|
/// failed with an OnError.
|
|
|
/// </summary>
|
|
|
- private Exception _error;
|
|
|
+ private Exception? _error;
|
|
|
|
|
|
/// <summary>
|
|
|
/// Indicates a dispose has been requested.
|
|
@@ -558,25 +556,20 @@ namespace System.Reactive
|
|
|
}
|
|
|
|
|
|
// get the next item from the queue if any
|
|
|
- var empty = !q.TryDequeue(out var v);
|
|
|
+ if (q.TryDequeue(out var v))
|
|
|
+ {
|
|
|
+ ForwardOnNext(v);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
// the upstream called OnComplete and the queue is empty
|
|
|
// that means we are done, no further signals can happen
|
|
|
- if (d && empty)
|
|
|
+ if (d)
|
|
|
{
|
|
|
Volatile.Write(ref _disposed, true);
|
|
|
// otherwise, complete normally
|
|
|
ForwardOnCompleted();
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // the queue is empty and the upstream hasn't completed yet
|
|
|
- if (empty)
|
|
|
- {
|
|
|
- return;
|
|
|
}
|
|
|
- // emit the item
|
|
|
- ForwardOnNext(v);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -617,7 +610,7 @@ namespace System.Reactive
|
|
|
/// <summary>
|
|
|
/// Set to a non-null Exception if the upstream terminated with OnError.
|
|
|
/// </summary>
|
|
|
- private Exception _error;
|
|
|
+ private Exception? _error;
|
|
|
|
|
|
/// <summary>
|
|
|
/// Indicates the sequence has been disposed and the drain task should quit.
|
|
@@ -633,7 +626,7 @@ namespace System.Reactive
|
|
|
/// <summary>
|
|
|
/// The disposable tracking the drain task.
|
|
|
/// </summary>
|
|
|
- private IDisposable _drainTask;
|
|
|
+ private IDisposable? _drainTask;
|
|
|
|
|
|
public ObserveOnObserverLongRunning(ISchedulerLongRunning scheduler, IObserver<TSource> observer) : base(observer)
|
|
|
{
|
|
@@ -736,7 +729,7 @@ namespace System.Reactive
|
|
|
// There was an item, signal it.
|
|
|
if (hasValue)
|
|
|
{
|
|
|
- ForwardOnNext(item);
|
|
|
+ ForwardOnNext(item!);
|
|
|
// Consume the item and try the next item if the work-in-progress
|
|
|
// indicator is still not zero
|
|
|
if (Interlocked.Decrement(ref _wip) != 0L)
|