|
|
@@ -5,6 +5,7 @@
|
|
|
#if !NO_PERF
|
|
|
using System;
|
|
|
using System.Reactive.Disposables;
|
|
|
+using System.Threading;
|
|
|
|
|
|
namespace System.Reactive.Linq.ObservableImpl
|
|
|
{
|
|
|
@@ -30,17 +31,19 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
class _ : Sink<TResult>
|
|
|
{
|
|
|
+ static readonly object EMPTY = new object();
|
|
|
+
|
|
|
private readonly WithLatestFrom<TFirst, TSecond, TResult> _parent;
|
|
|
|
|
|
public _(WithLatestFrom<TFirst, TSecond, TResult> parent, IObserver<TResult> observer, IDisposable cancel)
|
|
|
: base(observer, cancel)
|
|
|
{
|
|
|
_parent = parent;
|
|
|
+ Volatile.Write(ref _latest, EMPTY); // StoreStore barrier
|
|
|
}
|
|
|
|
|
|
private object _gate;
|
|
|
- private volatile bool _hasLatest;
|
|
|
- private TSecond _latest;
|
|
|
+ private object _latest;
|
|
|
|
|
|
public IDisposable Run()
|
|
|
{
|
|
|
@@ -86,13 +89,14 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
public void OnNext(TFirst value)
|
|
|
{
|
|
|
- if (_parent._hasLatest) // Volatile read
|
|
|
+ var latest = Volatile.Read(ref _parent._latest); // Volatile read
|
|
|
+ if (latest != EMPTY)
|
|
|
{
|
|
|
var res = default(TResult);
|
|
|
|
|
|
try
|
|
|
{
|
|
|
- res = _parent._parent._resultSelector(value, _parent._latest);
|
|
|
+ res = _parent._parent._resultSelector(value, (TSecond)_parent._latest);
|
|
|
}
|
|
|
catch (Exception ex)
|
|
|
{
|
|
|
@@ -140,8 +144,7 @@ namespace System.Reactive.Linq.ObservableImpl
|
|
|
|
|
|
public void OnNext(TSecond value)
|
|
|
{
|
|
|
- _parent._latest = value;
|
|
|
- _parent._hasLatest = true; // Volatile write
|
|
|
+ Interlocked.Exchange(ref _parent._latest, value); // StoreStore, StoreLoad barrier (can be just Write)
|
|
|
}
|
|
|
}
|
|
|
}
|