فهرست منبع

Add the TakeUntil(Func<T, bool>) operator (#612)

David Karnok 7 سال پیش
والد
کامیت
ae8c9898ba

+ 1 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -532,6 +532,7 @@ namespace System.Reactive.Linq
         IObservable<TSource> SkipUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
         IObservable<TSource> Switch<TSource>(IObservable<IObservable<TSource>> sources);
         IObservable<TSource> TakeUntil<TSource, TOther>(IObservable<TSource> source, IObservable<TOther> other);
+        IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate);
         IObservable<IObservable<TSource>> Window<TSource, TWindowClosing>(IObservable<TSource> source, Func<IObservable<TWindowClosing>> windowClosingSelector);
         IObservable<IObservable<TSource>> Window<TSource, TWindowOpening, TWindowClosing>(IObservable<TSource> source, IObservable<TWindowOpening> windowOpenings, Func<TWindowOpening, IObservable<TWindowClosing>> windowClosingSelector);
         IObservable<IObservable<TSource>> Window<TSource, TWindowBoundary>(IObservable<TSource> source, IObservable<TWindowBoundary> windowBoundaries);

+ 29 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable.Multiple.cs

@@ -1447,6 +1447,35 @@ namespace System.Reactive.Linq
             return s_impl.TakeUntil<TSource, TOther>(source, other);
         }
 
+        /// <summary>
+        /// Relays elements from the source observable sequence and calls the predicate after an
+        /// emission to check if the sequence should stop after that specific item.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
+        /// <param name="source">The source sequence to relay elements of.</param>
+        /// <param name="stopPredicate">Called after each upstream item has been emitted with
+        /// that upstream item and should return <code>true</code> to indicate the sequence should
+        /// complete.</param>
+        /// <returns>The observable sequence with the source elements until the stop predicate returns true.</returns>
+        /// <example>
+        /// The following sequence will stop after the value 5 has been encountered:
+        /// <code>
+        /// Observable.Range(1, 10)
+        ///     .TakeUntil(item =&gt; item == 5)
+        ///     .Subscribe(Console.WriteLine);
+        /// </code>
+        /// </example>
+        /// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
+        public static IObservable<TSource> TakeUntil<TSource>(this IObservable<TSource> source, Func<TSource, bool> stopPredicate)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (stopPredicate == null)
+                throw new ArgumentNullException(nameof(stopPredicate));
+
+            return s_impl.TakeUntil(source, stopPredicate);
+        }
+
         #endregion
 
         #region + Window +

+ 73 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeUntilPredicate.cs

@@ -0,0 +1,73 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    /// <summary>
+    /// Relays items to the downstream until the predicate returns <code>true</code>.
+    /// </summary>
+    /// <typeparam name="TSource">The element type of the sequence</typeparam>
+    internal sealed class TakeUntilPredicate<TSource> : 
+        Producer<TSource, TakeUntilPredicate<TSource>.TakeUntilPredicateObserver>
+    {
+        readonly IObservable<TSource> _source;
+
+        readonly Func<TSource, bool> _stopPredicate;
+
+        public TakeUntilPredicate(IObservable<TSource> source, Func<TSource, bool> stopPredicate)
+        {
+            this._source = source;
+            this._stopPredicate = stopPredicate;
+        }
+
+        protected override TakeUntilPredicateObserver CreateSink(IObserver<TSource> observer) => new TakeUntilPredicateObserver(observer, _stopPredicate);
+
+        protected override void Run(TakeUntilPredicateObserver sink) => sink.Run(_source);
+
+        internal sealed class TakeUntilPredicateObserver : IdentitySink<TSource>
+        {
+            readonly Func<TSource, bool> _stopPredicate;
+
+            public TakeUntilPredicateObserver(IObserver<TSource> downstream, 
+                Func<TSource, bool> predicate) : base (downstream)
+            {
+                this._stopPredicate = predicate;
+            }
+
+            public override void OnCompleted()
+            {
+                ForwardOnCompleted();
+            }
+
+            public override void OnError(Exception error)
+            {
+                ForwardOnError(error);
+            }
+
+            public override void OnNext(TSource value)
+            {
+                ForwardOnNext(value);
+
+                var shouldStop = false;
+                try
+                {
+                    shouldStop = _stopPredicate(value);
+                }
+                catch (Exception ex)
+                {
+                    ForwardOnError(ex);
+                    return;
+                }
+                if (shouldStop)
+                {
+                    ForwardOnCompleted();
+                }
+            }
+        }
+    }
+}

+ 40 - 0
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -15015,6 +15015,46 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Relays elements from the source observable sequence and calls the predicate after an
+        /// emission to check if the sequence should stop after that specific item.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source and result sequences.</typeparam>
+        /// <param name="source">The source sequence to relay elements of.</param>
+        /// <param name="stopPredicate">Called after each upstream item has been emitted with
+        /// that upstream item and should return <code>true</code> to indicate the sequence should
+        /// complete.</param>
+        /// <returns>The observable sequence with the source elements until the stop predicate returns true.</returns>
+        /// <example>
+        /// The following sequence will stop after the value 5 has been encountered:
+        /// <code>
+        /// Observable.Range(1, 10)
+        ///     .TakeUntil(item =&gt; item == 5)
+        ///     .Subscribe(Console.WriteLine);
+        /// </code>
+        /// </example>
+        /// <exception cref="ArgumentException">If <typeparamref name="TSource"/> or <paramref name="stopPredicate"/> is <code>null</code>.</exception>
+        public static IQbservable<TSource> TakeUntil<TSource>(this IQbservable<TSource> source, Expression<Func<TSource, bool>> stopPredicate)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (stopPredicate == null)
+                throw new ArgumentNullException(nameof(stopPredicate));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.TakeUntil<TSource>(default(IQbservable<TSource>), default(Expression<Func<TSource, bool>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    source.Expression,
+                    stopPredicate
+                )
+            );
+        }
+
         /// <summary>
         /// Returns elements from an observable sequence as long as a specified condition is true.
         /// </summary>

+ 5 - 0
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Multiple.cs

@@ -353,6 +353,11 @@ namespace System.Reactive.Linq
             return new TakeUntil<TSource, TOther>(source, other);
         }
 
+        public virtual IObservable<TSource> TakeUntil<TSource>(IObservable<TSource> source, Func<TSource, bool> stopPredicate)
+        {
+            return new TakeUntilPredicate<TSource>(source, stopPredicate);
+        }
+
         #endregion
 
         #region + Window +

+ 2 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt

@@ -1392,6 +1392,7 @@ namespace System.Reactive.Linq
         public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration) { }
         public static System.IObservable<System.Collections.Generic.IList<TSource>> TakeLastBuffer<TSource>(this System.IObservable<TSource> source, System.TimeSpan duration, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> TakeUntil<TSource, TOther>(this System.IObservable<TSource> source, System.IObservable<TOther> other) { }
+        public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> stopPredicate) { }
         public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime) { }
         public static System.IObservable<TSource> TakeUntil<TSource>(this System.IObservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> TakeWhile<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
@@ -2097,6 +2098,7 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.DateTimeOffset endTime, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource, TOther>(this System.Reactive.Linq.IQbservable<TSource> source, System.IObservable<TOther> other) { }
+        public static System.Reactive.Linq.IQbservable<TSource> TakeUntil<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> stopPredicate) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }
         public static System.Reactive.Linq.IQbservable<TSource> TakeWhile<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, int, bool>> predicate) { }
         public static System.Reactive.Joins.QueryablePlan<TResult> Then<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, TResult>> selector) { }

+ 177 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/TakeUntilTest.cs

@@ -685,5 +685,182 @@ namespace ReactiveTests.Tests
 
         #endregion
 
+        #region + Predicate +
+
+        [Fact]
+        public void TakeUntil_Predicate_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil<int>(null, v => true));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.TakeUntil<int>(DummyObservable<int>.Instance, null));
+        }
+
+        [Fact]
+        public void TakeUntil_Predicate_Basic()
+        {
+            var scheduler = new TestScheduler();
+
+            var source = scheduler.CreateColdObservable(
+                    OnNext(10, 1),
+                    OnNext(20, 2),
+                    OnNext(30, 3),
+                    OnNext(40, 4),
+                    OnNext(50, 5),
+                    OnNext(60, 6),
+                    OnNext(70, 7),
+                    OnNext(80, 8),
+                    OnNext(90, 9),
+                    OnCompleted<int>(100)
+                );
+
+            var result = scheduler.Start(() => source.TakeUntil(v => v == 5));
+
+            result.Messages.AssertEqual(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnNext(240, 4),
+                OnNext(250, 5),
+                OnCompleted<int>(250)
+            );
+
+            source.Subscriptions.AssertEqual(
+                Subscribe(200, 250)
+            );
+        }
+
+        [Fact]
+        public void TakeUntil_Predicate_True()
+        {
+            var scheduler = new TestScheduler();
+
+            var source = scheduler.CreateColdObservable(
+                    OnNext(10, 1),
+                    OnNext(20, 2),
+                    OnNext(30, 3),
+                    OnNext(40, 4),
+                    OnNext(50, 5),
+                    OnNext(60, 6),
+                    OnNext(70, 7),
+                    OnNext(80, 8),
+                    OnNext(90, 9),
+                    OnCompleted<int>(100)
+                );
+
+            var result = scheduler.Start(() => source.TakeUntil(v => true));
+
+            result.Messages.AssertEqual(
+                OnNext(210, 1),
+                OnCompleted<int>(210)
+            );
+
+            source.Subscriptions.AssertEqual(
+                Subscribe(200, 210)
+            );
+        }
+
+        [Fact]
+        public void TakeUntil_Predicate_False()
+        {
+            var scheduler = new TestScheduler();
+
+            var source = scheduler.CreateColdObservable(
+                    OnNext(10, 1),
+                    OnNext(20, 2),
+                    OnNext(30, 3),
+                    OnNext(40, 4),
+                    OnNext(50, 5),
+                    OnNext(60, 6),
+                    OnNext(70, 7),
+                    OnNext(80, 8),
+                    OnNext(90, 9),
+                    OnCompleted<int>(100)
+                );
+
+            var result = scheduler.Start(() => source.TakeUntil(v => false));
+
+            result.Messages.AssertEqual(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnNext(240, 4),
+                OnNext(250, 5),
+                OnNext(260, 6),
+                OnNext(270, 7),
+                OnNext(280, 8),
+                OnNext(290, 9),
+                OnCompleted<int>(300)
+            );
+
+            source.Subscriptions.AssertEqual(
+                Subscribe(200, 300)
+            );
+        }
+
+        [Fact]
+        public void TakeUntil_Predicate_Error()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new InvalidOperationException();
+
+            var source = scheduler.CreateColdObservable(
+                    OnNext(10, 1),
+                    OnNext(20, 2),
+                    OnNext(30, 3),
+                    OnError<int>(40, ex)
+                );
+
+            var result = scheduler.Start(() => source.TakeUntil(v => false));
+
+            result.Messages.AssertEqual(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnError<int>(240, ex)
+            );
+
+            source.Subscriptions.AssertEqual(
+                Subscribe(200, 240)
+            );
+        }
+
+        [Fact]
+        public void TakeUntil_Predicate_Crash()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new InvalidOperationException();
+
+            var source = scheduler.CreateColdObservable(
+                    OnNext(10, 1),
+                    OnNext(20, 2),
+                    OnNext(30, 3),
+                    OnNext(240, 4),
+                    OnNext(250, 5),
+                    OnCompleted<int>(260)
+                );
+
+            var result = scheduler.Start(() => source.TakeUntil(v => {
+                if (v == 3)
+                {
+                    throw ex;
+                }
+                return false;
+            }));
+
+            result.Messages.AssertEqual(
+                OnNext(210, 1),
+                OnNext(220, 2),
+                OnNext(230, 3),
+                OnError<int>(230, ex)
+            );
+
+            source.Subscriptions.AssertEqual(
+                Subscribe(200, 230)
+            );
+        }
+
+        #endregion
+
     }
 }