Browse Source

Have TakeUntil(time) use lock-free methods. (#611)

David Karnok 7 years ago
parent
commit
4a801c678a
1 changed files with 14 additions and 31 deletions
  1. 14 31
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

+ 14 - 31
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntil.cs

@@ -25,7 +25,6 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
-            private IDisposable _mainDisposable;
             private IDisposable _otherDisposable;
             private int _halfSerializer;
             private Exception _error;
@@ -38,16 +37,15 @@ namespace System.Reactive.Linq.ObservableImpl
             public void Run(TakeUntil<TSource, TOther> parent)
             {
                 Disposable.SetSingle(ref _otherDisposable, parent._other.Subscribe(new OtherObserver(this)));
-                Disposable.SetSingle(ref _mainDisposable, parent._source.Subscribe(this));
+                base.Run(parent._source);
             }
 
             protected override void Dispose(bool disposing)
             {
                 if (disposing)
                 {
-                    if (!Disposable.GetIsDisposed(ref _mainDisposable))
+                    if (!Disposable.GetIsDisposed(ref _otherDisposable))
                     {
-                        Disposable.TryDispose(ref _mainDisposable);
                         Disposable.TryDispose(ref _otherDisposable);
                     }
                 }
@@ -99,11 +97,6 @@ namespace System.Reactive.Linq.ObservableImpl
         }
     }
 
-    internal static class TakeUntilTerminalException
-    {
-        internal static readonly Exception Instance = new Exception("No further exceptions");
-    }
-
     internal sealed class TakeUntil<TSource> : Producer<TSource, TakeUntil<TSource>._>
     {
         private readonly IObservable<TSource> _source;
@@ -140,61 +133,51 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<TSource>
         {
-            private readonly object _gate = new object();
+            private IDisposable _timerDisposable;
+
+            private int _wip;
+
+            private Exception _error;
 
             public _(IObserver<TSource> observer)
                 : base(observer)
             {
             }
 
-            private IDisposable _sourceDisposable;
-
             public void Run(TakeUntil<TSource> parent)
             {
-                SetUpstream(parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
-                Disposable.SetSingle(ref _sourceDisposable, parent._source.SubscribeSafe(this));
+                Disposable.SetSingle(ref _timerDisposable, parent._scheduler.Schedule(this, parent._endTime, (_, state) => state.Tick()));
+                base.Run(parent._source);
             }
 
             protected override void Dispose(bool disposing)
             {
                 if (disposing)
                 {
-                    Disposable.TryDispose(ref _sourceDisposable);
+                    Disposable.TryDispose(ref _timerDisposable);
                 }
                 base.Dispose(disposing);
             }
 
             private IDisposable Tick()
             {
-                lock (_gate)
-                {
-                    ForwardOnCompleted();
-                }
+                OnCompleted();
                 return Disposable.Empty;
             }
 
             public override void OnNext(TSource value)
             {
-                lock (_gate)
-                {
-                    ForwardOnNext(value);
-                }
+                HalfSerializer.ForwardOnNext(this, value, ref _wip, ref _error);
             }
 
             public override void OnError(Exception error)
             {
-                lock (_gate)
-                {
-                    ForwardOnError(error);
-                }
+                HalfSerializer.ForwardOnError(this, error, ref _wip, ref _error);
             }
 
             public override void OnCompleted()
             {
-                lock (_gate)
-                {
-                    ForwardOnCompleted();
-                }
+                HalfSerializer.ForwardOnCompleted(this, ref _wip, ref _error);
             }
         }
     }