|
|
@@ -2,8 +2,6 @@
|
|
|
// The .NET Foundation licenses this file to you under the MIT License.
|
|
|
// See the LICENSE file in the project root for more information.
|
|
|
|
|
|
-#nullable disable
|
|
|
-
|
|
|
using System.Collections.Generic;
|
|
|
using System.Reactive.Concurrency;
|
|
|
using System.Reactive.Disposables;
|
|
|
@@ -43,7 +41,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_queue = new Queue<TSource>();
|
|
|
}
|
|
|
|
|
|
- private IDisposable _loopDisposable;
|
|
|
+ private IDisposable? _loopDisposable;
|
|
|
|
|
|
protected override void Dispose(bool disposing)
|
|
|
{
|
|
|
@@ -152,8 +150,8 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
_queue = new Queue<Reactive.TimeInterval<TSource>>();
|
|
|
}
|
|
|
|
|
|
- private IDisposable _loopDisposable;
|
|
|
- private IStopwatch _watch;
|
|
|
+ private IDisposable? _loopDisposable;
|
|
|
+ private IStopwatch? _watch;
|
|
|
|
|
|
public void Run(IObservable<TSource> source, IScheduler scheduler)
|
|
|
{
|
|
|
@@ -167,12 +165,13 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
Disposable.Dispose(ref _loopDisposable);
|
|
|
}
|
|
|
+
|
|
|
base.Dispose(disposing);
|
|
|
}
|
|
|
|
|
|
public override void OnNext(TSource value)
|
|
|
{
|
|
|
- var now = _watch.Elapsed;
|
|
|
+ var now = _watch!.Elapsed;
|
|
|
_queue.Enqueue(new Reactive.TimeInterval<TSource>(value, now));
|
|
|
Trim(now);
|
|
|
}
|
|
|
@@ -181,7 +180,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
DisposeUpstream();
|
|
|
|
|
|
- var now = _watch.Elapsed;
|
|
|
+ var now = _watch!.Elapsed;
|
|
|
Trim(now);
|
|
|
|
|
|
var longRunning = _loopScheduler.AsLongRunning();
|
|
|
@@ -209,6 +208,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
ForwardOnCompleted();
|
|
|
}
|
|
|
+
|
|
|
return Disposable.Empty;
|
|
|
}
|
|
|
|