Selaa lähdekoodia

Fix https://github.com/dotnet/reactive/issues/179: When the sampling-observable would terminate earlier than the source observable, the resulting observable would never terminate. (#520)

Daniel C. Weber 7 vuotta sitten
vanhempi
sitoutus
e118c7a74a

+ 22 - 7
Rx.NET/Source/src/System.Reactive/Linq/Observable/Sample.cs

@@ -32,10 +32,12 @@ namespace System.Reactive.Linq.ObservableImpl
             }
 
             private IDisposable _sourceSubscription;
+            private IDisposable _samplerSubscription;
 
             private bool _hasValue;
             private TSource _value;
-            private bool _atEnd;
+            private bool _sourceAtEnd;
+            private bool _samplerAtEnd;
 
             public IDisposable Run(Sample<TSource, TSample> parent)
             {
@@ -43,9 +45,11 @@ namespace System.Reactive.Linq.ObservableImpl
                 _sourceSubscription = sourceSubscription;
                 sourceSubscription.Disposable = parent._source.SubscribeSafe(this);
 
-                var samplerSubscription = parent._sampler.SubscribeSafe(new SampleObserver(this));
+                var samplerSubscription = new SingleAssignmentDisposable();
+                _samplerSubscription = samplerSubscription;
+                samplerSubscription.Disposable = parent._sampler.SubscribeSafe(new SampleObserver(this));
 
-                return StableCompositeDisposable.Create(_sourceSubscription, samplerSubscription);
+                return StableCompositeDisposable.Create(_sourceSubscription, _samplerSubscription);
             }
 
             public void OnNext(TSource value)
@@ -70,8 +74,15 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 lock (_gate)
                 {
-                    _atEnd = true;
-                    _sourceSubscription.Dispose();
+                    _sourceAtEnd = true;
+
+                    if (_samplerAtEnd)
+                    {
+                        base._observer.OnCompleted();
+                        base.Dispose();
+                    }
+                    else
+                        _sourceSubscription.Dispose();
                 }
             }
 
@@ -94,7 +105,7 @@ namespace System.Reactive.Linq.ObservableImpl
                             _parent._observer.OnNext(_parent._value);
                         }
 
-                        if (_parent._atEnd)
+                        if (_parent._sourceAtEnd)
                         {
                             _parent._observer.OnCompleted();
                             _parent.Dispose();
@@ -116,17 +127,21 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     lock (_parent._gate)
                     {
+                        _parent._samplerAtEnd = true;
+
                         if (_parent._hasValue)
                         {
                             _parent._hasValue = false;
                             _parent._observer.OnNext(_parent._value);
                         }
 
-                        if (_parent._atEnd)
+                        if (_parent._sourceAtEnd)
                         {
                             _parent._observer.OnCompleted();
                             _parent.Dispose();
                         }
+                        else
+                            _parent._samplerSubscription.Dispose();
                     }
                 }
             }

+ 41 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/ObservableTimeTest.cs

@@ -3232,6 +3232,47 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [Fact]
+        public void Sample_Sampler_completes_first()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnNext(240, 3),
+                OnNext(290, 4),
+                OnCompleted<int>(600)
+            );
+
+            var ys = scheduler.CreateHotObservable(
+                OnNext(150, ""),
+                OnNext(210, "bar"),
+                OnNext(250, "foo"),
+                OnNext(260, "qux"),
+                OnNext(320, "baz"),
+                OnCompleted<string>(500)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Sample(ys)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(250, 3),
+                OnNext(320, 4),
+                OnCompleted<int>(600 /* on sampling boundaries only */)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 600)
+            );
+
+            ys.Subscriptions.AssertEqual(
+                Subscribe(200, 500)
+            );
+        }
+
         [Fact]
         public void Sample_Sampler_SourceThrows()
         {