Browse Source

Fix Generate (timed) crash upon disposing a long sequence

akarnokd 6 years ago
parent
commit
e5b0a4f5ca

+ 33 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs

@@ -195,9 +195,19 @@ namespace System.Reactive.Linq.ObservableImpl
                 private bool _hasResult;
                 private TResult _result;
 
+                private IDisposable _timerDisposable;
+
                 public void Run(IScheduler outerScheduler, TState initialState)
                 {
-                    SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState)));
+                    var timer = new SingleAssignmentDisposable();
+                    Disposable.TrySetMultiple(ref _timerDisposable, timer);
+                    timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState));
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    Disposable.TryDispose(ref _timerDisposable);
+                    base.Dispose(disposing);
                 }
 
                 private IDisposable InvokeRec(IScheduler self, TState state)
@@ -240,7 +250,11 @@ namespace System.Reactive.Linq.ObservableImpl
                         return Disposable.Empty;
                     }
 
-                    return self.Schedule((@this: this, state), time, (scheduler, tuple) => [email protected](scheduler, tuple.state));
+                    var timer = new SingleAssignmentDisposable();
+                    Disposable.TrySetMultiple(ref _timerDisposable, timer);
+                    timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => [email protected](scheduler, tuple.state));
+
+                    return Disposable.Empty;
                 }
             }
         }
@@ -275,6 +289,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly Func<TState, TResult> _resultSelector;
                 private readonly Func<TState, TimeSpan> _timeSelector;
 
+
                 public _(Relative parent, IObserver<TResult> observer)
                     : base(observer)
                 {
@@ -290,9 +305,19 @@ namespace System.Reactive.Linq.ObservableImpl
                 private bool _hasResult;
                 private TResult _result;
 
+                private IDisposable _timerDisposable;
+
                 public void Run(IScheduler outerScheduler, TState initialState)
                 {
-                    SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState)));
+                    var timer = new SingleAssignmentDisposable();
+                    Disposable.TrySetMultiple(ref _timerDisposable, timer);
+                    timer.Disposable = outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState));
+                }
+
+                protected override void Dispose(bool disposing)
+                {
+                    Disposable.TryDispose(ref _timerDisposable);
+                    base.Dispose(disposing);
                 }
 
                 private IDisposable InvokeRec(IScheduler self, TState state)
@@ -335,10 +360,13 @@ namespace System.Reactive.Linq.ObservableImpl
                         return Disposable.Empty;
                     }
 
-                    return self.Schedule((@this: this, state), time, (scheduler, tuple) => [email protected](scheduler, tuple.state));
+                    var timer = new SingleAssignmentDisposable();
+                    Disposable.TrySetMultiple(ref _timerDisposable, timer);
+                    timer.Disposable = self.Schedule((@this: this, state), time, (scheduler, tuple) => [email protected](scheduler, tuple.state));
+
+                    return Disposable.Empty;
                 }
             }
         }
     }
 }
-

+ 35 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/GenerateTest.cs

@@ -9,6 +9,7 @@ using System.Reactive.Concurrency;
 using System.Reactive.Linq;
 using System.Runtime.CompilerServices;
 using System.Threading;
+using System.Threading.Tasks;
 using Microsoft.Reactive.Testing;
 using ReactiveTests.Dummies;
 using Xunit;
@@ -454,6 +455,40 @@ namespace ReactiveTests.Tests
             Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x)).AssertEqual(Observable.Generate(0, x => x < 10, x => x + 1, x => x, x => DateTimeOffset.Now.AddMilliseconds(x), DefaultScheduler.Instance));
         }
 
+        [Fact]
+        public void Generate_TimeSpan_DisposeLater()
+        {
+            var count = 0;
+            var d = Observable.Generate(0, i => true, i => i + 1, i => i, _ => TimeSpan.Zero)
+                .WithLatestFrom(Observable.Return(1).Concat(Observable.Never<int>()), (a, b) => a)
+                .Subscribe(v => Volatile.Write(ref count, v));
+
+            while (Volatile.Read(ref count) < 10000)
+            {
+                Thread.Sleep(10);
+            }
+
+            d.Dispose();
+        }
+
+        [Fact]
+        public void Generate_DateTimeOffset_DisposeLater()
+        {
+            var count = 0;
+
+            var d = Observable.Generate(0, i => true, i => i + 1, i => i, _ => DateTimeOffset.Now)
+                .WithLatestFrom(Observable.Return(1).Concat(Observable.Never<int>()), (a, b) => a)
+                .Subscribe(v => Volatile.Write(ref count, v));
+
+            while (Volatile.Read(ref count) < 10000)
+            {
+                Thread.Sleep(10);
+            }
+
+            d.Dispose();
+        }
+
+
         #endregion
     }
 }