瀏覽代碼

TakeUntil with CancellationToken (#2222)

Based on original PR #2182 (Implementing TakeUntil with CancellationToken) by @nilsauf but with the following changes

* Revert to block-scoped namespaces
* Fix typos, standardized spelling of cancelled
* Update release notes
* Add new TakeUntil to verified API
* Add IQbservable form of TakeUntil
* Fix exception documentation on new and one existing TakeUntil overload

---------

Co-authored-by: Nils Aufschläger <[email protected]>
Ian Griffiths 1 月之前
父節點
當前提交
8a0af2d358

+ 9 - 1
Rx.NET/Documentation/ReleaseHistory/Rx.v6.md

@@ -1,4 +1,12 @@
-# Rx Release History v6.0
+# Rx Release History v6.0
+
+## 6.1.0
+
+This release adds:
+
+* A `DisposeWith`extension method for `IDisposable` to simplify disposal in conjunction with `CompositeDisposable` (see [#2178](https://github.com/dotnet/reactive/pull/2178) thanks to [Chris Pulman](https://github.com/ChrisPulman)
+* A new overload of `TakeUntil` accepting a `CancellationToken` (see [#2181](https://github.com/dotnet/reactive/issues/2181) thanks to [Nils Aufschläger](https://github.com/nilsauf)
+
 
 ## v6.0.2
 

+ 1 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -558,6 +558,7 @@ namespace System.Reactive.Linq
         IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources);
         IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
         IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate);
+        IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, CancellationToken cancellationToken);
         IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector);
         IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);
         IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries);

+ 20 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable.Multiple.cs

@@ -3,7 +3,6 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
-using System.Configuration;
 using System.Reactive.Concurrency;
 using System.Threading;
 using System.Threading.Tasks;
@@ -865,7 +864,7 @@ namespace System.Reactive.Linq
         ///     .Subscribe(Console.WriteLine);
         /// </code>
         /// </example>
-        /// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
         public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> stopPredicate)
         {
             if (source == null)
@@ -881,6 +880,25 @@ namespace System.Reactive.Linq
             return s_impl.TakeUntil(source, stopPredicate);
         }
 
+        /// <summary>
+        /// Relays elements from the source observable sequence until the provided <paramref name="cancellationToken"/> is cancelled.
+        /// Completes immediately if the provided <paramref name="cancellationToken"/> is already cancelled upon subscription.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
+        /// <param name="source">The source sequence to relay elements of.</param>
+        /// <param name="cancellationToken">The cancellation token to complete the target observable sequence on.</param>
+        /// <returns>The observable sequence with the source elements until the provided <paramref name="cancellationToken"/> is cancelled.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is <code>null</code>.</exception>
+        public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken cancellationToken)
+        {
+            if (source == null)
+            {
+                throw new ArgumentNullException(nameof(source));
+            }
+
+            return s_impl.TakeUntil(source, cancellationToken);
+        }
+
         #endregion
 
         #region + Window +

+ 77 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntilCancellationToken.cs

@@ -0,0 +1,77 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT License.
+// See the LICENSE file in the project root for more information.
+
+using System.Reactive.Disposables;
+using System.Threading;
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    /// <summary>
+    /// Relays items to the downstream until the CancellationToken is cancelled.
+    /// </summary>
+    /// <typeparam name="TSource">The element type of the sequence</typeparam>
+    internal sealed class TakeUntilCancellationToken<TSource> :
+        Producer<TSource, TakeUntilCancellationToken<TSource>._>
+    {
+        private readonly IObservable<TSource> _source;
+        private readonly CancellationToken _token;
+
+        public TakeUntilCancellationToken(IObservable<TSource> source, CancellationToken token)
+        {
+            _source = source;
+            _token = token;
+        }
+
+        protected override _ CreateSink(IObserver<TSource> observer) => new(observer);
+
+        protected override void Run(_ sink) => sink.Run(this);
+
+        internal sealed class _ : IdentitySink<TSource>
+        {
+            private SingleAssignmentDisposableValue _cancellationTokenRegistration;
+            private int _wip;
+            private Exception? _error;
+
+            public _(IObserver<TSource> observer) : base(observer)
+            {
+            }
+
+            public void Run(TakeUntilCancellationToken<TSource> parent)
+            {
+                if (parent._token.IsCancellationRequested)
+                {
+                    OnCompleted();
+                    return;
+                }
+
+                _cancellationTokenRegistration.Disposable = parent._token.Register(OnCompleted);
+                Run(parent._source);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    _cancellationTokenRegistration.Dispose();
+                }
+                base.Dispose(disposing);
+            }
+
+            public override void OnNext(TSource value)
+            {
+                HalfSerializer.ForwardOnNext(this, value, ref _wip, ref _error);
+            }
+
+            public override void OnError(Exception error)
+            {
+                HalfSerializer.ForwardOnError(this, error, ref _wip, ref _error);
+            }
+
+            public override void OnCompleted()
+            {
+                HalfSerializer.ForwardOnCompleted(this, ref _wip, ref _error);
+            }
+        }
+    }
+}

+ 26 - 1
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -13761,6 +13761,31 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Relays elements from the source observable sequence until the provided <paramref name="cancellationToken" /> is cancelled.
+        /// Completes immediately if the provided <paramref name="cancellationToken" /> is already cancelled upon subscription.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
+        /// <param name="source">The source sequence to relay elements of.</param>
+        /// <param name="cancellationToken">The cancellation token to complete the target observable sequence on.</param>
+        /// <returns>The observable sequence with the source elements until the provided <paramref name="cancellationToken" /> is cancelled.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is <code>null</code>.</exception>
+        public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()!).MakeGenericMethod(typeof(TSource)),
+                    source.Expression,
+                    Expression.Constant(cancellationToken, typeof(CancellationToken))
+                )
+            );
+        }
+
+
         /// <summary>
         /// Takes elements for the specified duration until the specified end time.
         /// </summary>
@@ -13858,7 +13883,7 @@ namespace System.Reactive.Linq
         ///     .Subscribe(Console.WriteLine);
         /// </code>
         /// </example>
-        /// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
         public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, bool>> stopPredicate)
         {
             if (source == null)

+ 6 - 0
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs

@@ -6,6 +6,7 @@ using System.Collections.Generic;
 using System.Linq;
 using System.Reactive.Concurrency;
 using System.Reactive.Threading.Tasks;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Reactive.Linq
@@ -282,6 +283,11 @@ namespace System.Reactive.Linq
             return new TakeUntilPredicate<TSource>(source, stopPredicate);
         }
 
+        public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, CancellationToken cancellationToken)
+        {
+            return new TakeUntilCancellationToken<TSource>(source, cancellationToken);
+        }
+
         #endregion
 
         #region + Window +

+ 2 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.verified.cs

@@ -1464,6 +1464,7 @@ namespace System.Reactive.Linq
         public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime) { }
         public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> stopPredicate) { }
+        public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Threading.CancellationToken cancellationToken) { }
         public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> TakeUntil<TSource, TOther>(this System.IObservable<TSource> source, System.IObservable<TOther> other) { }
         public static System.IObservable<TSource> TakeWhile<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
@@ -2296,6 +2297,7 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> stopPredicate) { }
+        public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Threading.CancellationToken cancellationToken) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource, TOther>(this System.Reactive.Linq.IQbservable<TSource> source, System.IObservable<TOther> other) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }

+ 59 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/TakeUntilTest.cs

@@ -860,5 +860,64 @@ namespace ReactiveTests.Tests
 
         #endregion
 
+        #region + CancellationToken +
+
+        [TestMethod]
+        public void TakeUntil_CancellationToken_BasicCancelation()
+        {
+            var scheduler = new TestScheduler();
+            var tokenSource = new CancellationTokenSource();
+
+            var source = scheduler.CreateColdObservable(
+                OnNext(10, 1),
+                OnNext(20, 2),
+                OnNext(30, 3),
+                OnNext(40, 4),
+                OnNext(50, 5),
+                OnCompleted<int>(260)
+                );
+
+            scheduler.ScheduleAbsolute(235, () => tokenSource.Cancel());
+
+            var result = scheduler.Start(() => source.TakeUntil(tokenSource.Token));
+
+            result.Messages.AssertEqual(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnCompleted<int>(235)
+            );
+
+            source.Subscriptions.AssertEqual(
+                Subscribe(200, 235)
+            );
+        }
+
+        [TestMethod]
+        public void TakeUntil_CancellationToken_AlreadyCanceled()
+        {
+            var scheduler = new TestScheduler();
+            var tokenSource = new CancellationTokenSource();
+            tokenSource.Cancel();
+
+            var source = scheduler.CreateColdObservable(
+                OnNext(10, 1),
+                OnNext(20, 2),
+                OnNext(30, 3),
+                OnNext(40, 4),
+                OnNext(50, 5),
+                OnCompleted<int>(260)
+                );
+
+            var result = scheduler.Start(() => source.TakeUntil(tokenSource.Token));
+
+            result.Messages.AssertEqual(
+                OnCompleted<int>(200)
+            );
+
+            Assert.Empty(source.Subscriptions);
+        }
+
+        #endregion
     }
 }