Ver código fonte

Add the Append and Prepend operator (#567)

Peter Wehrfritz 7 anos atrás
pai
commit
c165913e2d

+ 4 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -564,6 +564,8 @@ namespace System.Reactive.Linq
 
         #region * Single *
 
+        IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value);
+        IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler);
         IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source);
         IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count);
         IObservable<IList<TSource>> Buffer<TSource>(IObservable<TSource> source, int count, int skip);
@@ -580,6 +582,8 @@ namespace System.Reactive.Linq
         IObservable<TSource> Finally<TSource>(IObservable<TSource> source, Action finallyAction);
         IObservable<TSource> IgnoreElements<TSource>(IObservable<TSource> source);
         IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source);
+        IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value);
+        IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler);
         IObservable<TSource> Repeat<TSource>(IObservable<TSource> source);
         IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount);
         IObservable<TSource> RepeatWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler);

+ 77 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs

@@ -9,6 +9,45 @@ namespace System.Reactive.Linq
 {
     public static partial class Observable
     {
+        #region + Append +
+
+        /// <summary>
+        /// Append a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to append the value to.</param>
+        /// <param name="value">Value to append to the specified sequence.</param>
+        /// <returns>The source sequence appended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> Append<TSource>(this IObservable<TSource> source, TSource value)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return s_impl.Append<TSource>(source, value);
+        }
+
+        /// <summary>
+        /// Append a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to append the value to.</param>
+        /// <param name="value">Value to append to the specified sequence.</param>
+        /// <param name="scheduler">Scheduler to emit the append values on.</param>
+        /// <returns>The source sequence appended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> Append<TSource>(this IObservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return s_impl.Append<TSource>(source, value, scheduler);
+        }
+
+        #endregion
+
         #region + AsObservable +
 
         /// <summary>
@@ -341,6 +380,44 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region + Prepend +
+
+        /// <summary>
+        /// Prepend a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to prepend the value to.</param>
+        /// <param name="value">Value to prepend to the specified sequence.</param>
+        /// <returns>The source sequence prepended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> Prepend<TSource>(this IObservable<TSource> source, TSource value)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return s_impl.Prepend<TSource>(source, value);
+        }
+
+        /// <summary>
+        /// Prepend a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to prepend the value to.</param>
+        /// <param name="value">Value to prepend to the specified sequence.</param>
+        /// <param name="scheduler">Scheduler to emit the prepend values on.</param>
+        /// <returns>The source sequence prepended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        public static IObservable<TSource> Prepend<TSource>(this IObservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return s_impl.Prepend<TSource>(source, value, scheduler);
+        }
+
+        #endregion
         #region + Repeat +
 
         /// <summary>

+ 121 - 1
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -1,5 +1,5 @@
 /*
- * WARNING: Auto-generated file (06/12/2018 13:00:48)
+ * WARNING: Auto-generated file (merged on 06/13/2018)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
 
@@ -310,6 +310,66 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Append a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to append the value to.</param>
+        /// <param name="value">Value to append to the specified sequence.</param>
+        /// <returns>The source sequence appended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        public static IQbservable<TSource> Append<TSource>(this IQbservable<TSource> source, TSource value)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.Append<TSource>(default(IQbservable<TSource>), default(TSource))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    source.Expression,
+                    Expression.Constant(value, typeof(TSource))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Append a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to append the value to.</param>
+        /// <param name="value">Value to append to the specified sequence.</param>
+        /// <param name="scheduler">Scheduler to emit the append values on.</param>
+        /// <returns>The source sequence appended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        public static IQbservable<TSource> Append<TSource>(this IQbservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.Append<TSource>(default(IQbservable<TSource>), default(TSource), default(IScheduler))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    source.Expression,
+                    Expression.Constant(value, typeof(TSource)),
+                    Expression.Constant(scheduler, typeof(IScheduler))
+                )
+            );
+        }
+
         /// <summary>
         /// Automatically connect the upstream IConnectableObservable at most once when the
         /// specified number of IObservers have subscribed to this IObservable.
@@ -10472,6 +10532,66 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Prepend a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to prepend the value to.</param>
+        /// <param name="value">Value to prepend to the specified sequence.</param>
+        /// <returns>The source sequence prepended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        public static IQbservable<TSource> Prepend<TSource>(this IQbservable<TSource> source, TSource value)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.Prepend<TSource>(default(IQbservable<TSource>), default(TSource))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    source.Expression,
+                    Expression.Constant(value, typeof(TSource))
+                )
+            );
+        }
+
+        /// <summary>
+        /// Prepend a value to an observable sequence.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <param name="source">Source sequence to prepend the value to.</param>
+        /// <param name="value">Value to prepend to the specified sequence.</param>
+        /// <param name="scheduler">Scheduler to emit the prepend values on.</param>
+        /// <returns>The source sequence prepended with the specified value.</returns>
+        /// <exception cref="ArgumentNullException">
+        /// <paramref name="source" /> is null.</exception>
+        public static IQbservable<TSource> Prepend<TSource>(this IQbservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (scheduler == null)
+                throw new ArgumentNullException(nameof(scheduler));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.Prepend<TSource>(default(IQbservable<TSource>), default(TSource), default(IScheduler))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)),
+#endif
+                    source.Expression,
+                    Expression.Constant(value, typeof(TSource)),
+                    Expression.Constant(scheduler, typeof(IScheduler))
+                )
+            );
+        }
+
         /// <summary>
         /// Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence.
         /// This operator is a specialization of Multicast using a regular <see cref="T:System.Reactive.Subjects.Subject`1" />.

+ 38 - 0
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs

@@ -12,6 +12,25 @@ namespace System.Reactive.Linq
 
     internal partial class QueryLanguage
     {
+        #region - Append -
+
+        public virtual IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value)
+        {
+            return Append_<TSource>(source, value, SchedulerDefaults.ConstantTimeOperations);
+        }
+
+        public virtual IObservable<TSource> Append<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            return Append_<TSource>(source, value, scheduler);
+        }
+
+        private static IObservable<TSource> Append_<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            return source.Concat(new [] { value }.ToObservable(scheduler));
+        }
+
+        #endregion
+
         #region + AsObservable +
 
         public virtual IObservable<TSource> AsObservable<TSource>(IObservable<TSource> source)
@@ -154,6 +173,25 @@ namespace System.Reactive.Linq
 
         #endregion
 
+        #region - Prepend -
+
+        public virtual IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value)
+        {
+            return Prepend_<TSource>(source, value, SchedulerDefaults.ConstantTimeOperations);
+        }
+
+        public virtual IObservable<TSource> Prepend<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            return Prepend_<TSource>(source, value, scheduler);
+        }
+
+        private static IObservable<TSource> Prepend_<TSource>(IObservable<TSource> source, TSource value, IScheduler scheduler)
+        {
+            return StartWith_(source, scheduler, new[] { value });
+        }
+
+        #endregion
+
         #region - Repeat -
 
         public virtual IObservable<TSource> Repeat<TSource>(IObservable<TSource> source)

+ 8 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt

@@ -843,6 +843,8 @@ namespace System.Reactive.Linq
         public static System.Reactive.Joins.Pattern<TLeft, TRight> And<TLeft, TRight>(this System.IObservable<TLeft> left, System.IObservable<TRight> right) { }
         public static System.IObservable<bool> Any<TSource>(this System.IObservable<TSource> source) { }
         public static System.IObservable<bool> Any<TSource>(this System.IObservable<TSource> source, System.Func<TSource, bool> predicate) { }
+        public static System.IObservable<TSource> Append<TSource>(this System.IObservable<TSource> source, TSource value) { }
+        public static System.IObservable<TSource> Append<TSource>(this System.IObservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> AsObservable<TSource>(this System.IObservable<TSource> source) { }
         public static System.IObservable<TSource> AutoConnect<TSource>(this System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers = 1, System.Action<System.IDisposable> onConnect = null) { }
         public static System.IObservable<double> Average(this System.IObservable<double> source) { }
@@ -1237,6 +1239,8 @@ namespace System.Reactive.Linq
         public static System.IObservable<TSource> OnErrorResumeNext<TSource>(this System.IObservable<TSource> first, System.IObservable<TSource> second) { }
         public static System.IObservable<TSource> OnErrorResumeNext<TSource>(params System.IObservable<>[] sources) { }
         public static System.IObservable<TSource> OnErrorResumeNext<TSource>(this System.Collections.Generic.IEnumerable<System.IObservable<TSource>> sources) { }
+        public static System.IObservable<TSource> Prepend<TSource>(this System.IObservable<TSource> source, TSource value) { }
+        public static System.IObservable<TSource> Prepend<TSource>(this System.IObservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Subjects.IConnectableObservable<TSource> Publish<TSource>(this System.IObservable<TSource> source) { }
         public static System.IObservable<TResult> Publish<TSource, TResult>(this System.IObservable<TSource> source, System.Func<System.IObservable<TSource>, System.IObservable<TResult>> selector) { }
         public static System.Reactive.Subjects.IConnectableObservable<TSource> Publish<TSource>(this System.IObservable<TSource> source, TSource initialValue) { }
@@ -1622,6 +1626,8 @@ namespace System.Reactive.Linq
         public static System.Reactive.Joins.QueryablePattern<TLeft, TRight> And<TLeft, TRight>(this System.Reactive.Linq.IQbservable<TLeft> left, System.IObservable<TRight> right) { }
         public static System.Reactive.Linq.IQbservable<bool> Any<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
         public static System.Reactive.Linq.IQbservable<bool> Any<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<TSource, bool>> predicate) { }
+        public static System.Reactive.Linq.IQbservable<TSource> Append<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value) { }
+        public static System.Reactive.Linq.IQbservable<TSource> Append<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> AsObservable<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
         public static System.Reactive.Linq.IQbservable<TSource> AsQbservable<TSource>(this System.IObservable<TSource> source) { }
         public static System.Reactive.Linq.IQbservable<TSource> AutoConnect<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Reactive.Subjects.IConnectableObservable<TSource> source, int minObservers, System.Linq.Expressions.Expression<System.Action<System.IDisposable>> onConnect) { }
@@ -1959,6 +1965,8 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<TSource> OnErrorResumeNext<TSource>(this System.Reactive.Linq.IQbservable<TSource> first, System.IObservable<TSource> second) { }
         public static System.Reactive.Linq.IQbservable<TSource> OnErrorResumeNext<TSource>(this System.Reactive.Linq.IQbservableProvider provider, params System.IObservable<>[] sources) { }
         public static System.Reactive.Linq.IQbservable<TSource> OnErrorResumeNext<TSource>(this System.Reactive.Linq.IQbservableProvider provider, System.Collections.Generic.IEnumerable<System.IObservable<TSource>> sources) { }
+        public static System.Reactive.Linq.IQbservable<TSource> Prepend<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value) { }
+        public static System.Reactive.Linq.IQbservable<TSource> Prepend<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, TSource value, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TResult> Publish<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector) { }
         public static System.Reactive.Linq.IQbservable<TResult> Publish<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector, TSource initialValue) { }
         public static System.Reactive.Linq.IQbservable<TResult> PublishLast<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector) { }

+ 190 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/AppendTest.cs

@@ -0,0 +1,190 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using Microsoft.Reactive.Testing;
+using Xunit;
+
+namespace ReactiveTests.Tests
+{
+    public class AppendTest : ReactiveTest
+    {
+        [Fact]
+        public void Append_ArgumentChecking()
+        {
+            var scheduler = new TestScheduler();
+            var someObservable = Observable.Empty<int>();
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Append(default(IObservable<int>), 1));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Append(default(IObservable<int>), 1, scheduler));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Append(someObservable, 1, default(IScheduler)));
+        }
+
+        [Fact]
+        public void Append()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Append(3)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(220, 2),
+                OnNext(250, 3),
+                OnCompleted<int>(250)
+            );
+        }
+
+        [Fact]
+        public void Append_Null()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, "1"),
+                OnNext(220, "2"),
+                OnCompleted<string>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Append(null)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(220, "2"),
+                OnNext(250, (string)null),
+                OnCompleted<string>(250)
+            );
+        }
+
+        [Fact]
+        public void Append_Scheduler()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Append(3, scheduler)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(220, 2),
+                OnNext(251, 3),
+                OnCompleted<int>(252)
+            );
+        }
+
+        [Fact]
+        public void Append_Many()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Append(3).Append(4).Append(5)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(220, 2),
+                OnNext(250, 3),
+                OnNext(250, 4),
+                OnNext(250, 5),
+                OnCompleted<int>(250)
+            );
+        }
+
+        [Fact]
+        public void Append_Many_Take()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Append(3).Append(4).Append(5).Take(2)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(220, 2),
+                OnNext(250, 3),
+                OnCompleted<int>(250)
+            );
+        }
+
+        [Fact]
+        public void Append_Many_Scheduler()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Append(3, scheduler).Append(4, scheduler).Append(5, scheduler)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(220, 2),
+                OnNext(251, 3),
+                OnNext(253, 4),
+                OnNext(255, 5),
+                OnCompleted<int>(256)
+            );
+        }
+
+        [Fact]
+        public void Append_Many_Take_Scheduler()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Append(3, scheduler).Append(4, scheduler).Append(5, scheduler).Take(2)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(220, 2),
+                OnNext(251, 3),
+                OnCompleted<int>(251)
+            );
+        }
+    }
+}

+ 175 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/PrependTest.cs

@@ -0,0 +1,175 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using Microsoft.Reactive.Testing;
+using Xunit;
+using ReactiveTests.Dummies;
+using System.Reflection;
+using System.Threading;
+using System.Reactive.Disposables;
+using System.Reactive.Subjects;
+
+namespace ReactiveTests.Tests
+{
+    public class PrependTest : ReactiveTest
+    {
+
+        [Fact]
+        public void Prepend_ArgumentChecking()
+        {
+            var scheduler = new TestScheduler();
+            var someObservable = Observable.Empty<int>();
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Prepend(default(IObservable<int>), 1));
+
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Prepend(default(IObservable<int>), 1, scheduler));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Prepend(someObservable, 1, default(IScheduler)));
+        }
+
+        [Fact]
+        public void Prepend()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Prepend(3)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(200, 3),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+        }
+
+        [Fact]
+        public void Prepend_Scheduler()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 4),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Prepend(3, scheduler)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(201, 3),
+                OnNext(220, 4),
+                OnCompleted<int>(250)
+            );
+        }
+
+        [Fact]
+        public void Prepend_Many()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Prepend(3).Prepend(4).Prepend(5)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(200, 5),
+                OnNext(200, 4),
+                OnNext(200, 3),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+        }
+
+        [Fact]
+        public void Prepend_Many_Take()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Prepend(3).Prepend(4).Prepend(5).Take(2)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(200, 5),
+                OnNext(200, 4),
+                OnCompleted<int>(200)
+            );
+        }
+
+
+        [Fact]
+        public void Prepend_Many_Scheduler()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Prepend(3, scheduler).Prepend(4, scheduler).Prepend(5, scheduler)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(201, 5),
+                OnNext(203, 4),
+                OnNext(205, 3),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+        }
+
+        [Fact]
+        public void Prepend_Many_Take_Scheduler()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateHotObservable(
+                OnNext(150, 1),
+                OnNext(220, 2),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.Prepend(3, scheduler).Prepend(4, scheduler).Prepend(5, scheduler).Take(2)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(201, 5),
+                OnNext(203, 4),
+                OnCompleted<int>(203)
+            );
+        }
+    }
+}