Browse Source

Add the RepeatWhen operator (#536)

David Karnok 7 years ago
parent
commit
24fb6db610

+ 1 - 0
Rx.NET/Source/src/System.Reactive/Linq/IQueryLanguage.cs

@@ -580,6 +580,7 @@ namespace System.Reactive.Linq
         IObservable<Notification<TSource>> Materialize<TSource>(IObservable<TSource> source);
         IObservable<TSource> Repeat<TSource>(IObservable<TSource> source);
         IObservable<TSource> Repeat<TSource>(IObservable<TSource> source, int repeatCount);
+        IObservable<TSource> RepeatWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler);
         IObservable<TSource> Retry<TSource>(IObservable<TSource> source);
         IObservable<TSource> Retry<TSource>(IObservable<TSource> source, int retryCount);
         IObservable<TSource> RetryWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<Exception>, IObservable<TSignal>> handler);

+ 25 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable.Single.cs

@@ -377,6 +377,31 @@ namespace System.Reactive.Linq
             return s_impl.Repeat<TSource>(source, repeatCount);
         }
 
+        /// <summary>
+        /// Repeatedly resubscribes to the source observable after a normal completion and when the observable
+        /// returned by a handler produces an arbitrary item.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TSignal">The arbitrary element type signaled by the handler observable.</typeparam>
+        /// <param name="source">Observable sequence to keep repeating when it successfully terminates.</param>
+        /// <param name="handler">The function that is called for each observer and takes an observable sequence objects.
+        /// It should return an observable of arbitrary items that should signal that arbitrary item in
+        /// response to receiving the completion signal from the source observable. If this observable signals
+        /// a terminal event, the sequence is terminated with that signal instead.</param>
+        /// <returns>An observable sequence producing the elements of the given sequence repeatedly while each repetition terminates successfully.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
+        public static IObservable<TSource> RepeatWhen<TSource, TSignal>(this IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (handler == null)
+                throw new ArgumentNullException(nameof(handler));
+
+            return s_impl.RepeatWhen(source, handler);
+        }
+
+
         #endregion
 
         #region + Retry +

+ 170 - 0
Rx.NET/Source/src/System.Reactive/Linq/Observable/RepeatWhen.cs

@@ -0,0 +1,170 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Reactive.Disposables;
+using System.Reactive.Subjects;
+using System.Text;
+using System.Threading;
+
+namespace System.Reactive.Linq.ObservableImpl
+{
+    internal sealed class RepeatWhen<T, U> : IObservable<T>
+    {
+        readonly IObservable<T> source;
+
+        readonly Func<IObservable<object>, IObservable<U>> handler;
+
+        internal RepeatWhen(IObservable<T> source, Func<IObservable<object>, IObservable<U>> handler)
+        {
+            this.source = source;
+            this.handler = handler;
+        }
+
+        public IDisposable Subscribe(IObserver<T> observer)
+        {
+            if (observer == null)
+            {
+                throw new ArgumentNullException(nameof(observer));
+            }
+
+            var completeSignals = new Subject<object>();
+            var redo = default(IObservable<U>);
+
+            try
+            {
+                redo = handler(completeSignals);
+                if (redo == null)
+                {
+                    throw new NullReferenceException("The handler returned a null IObservable");
+                }
+            }
+            catch (Exception ex)
+            {
+                observer.OnError(ex);
+                return Disposable.Empty;
+            }
+
+            var parent = new MainObserver(observer, source, new RedoSerializedObserver<object>(completeSignals));
+
+            var d = redo.SubscribeSafe(parent.handlerObserver);
+            Disposable.SetSingle(ref parent.handlerUpstream, d);
+
+            parent.HandlerNext();
+
+            return parent;
+        }
+
+        sealed class MainObserver : Sink<T>, IObserver<T>
+        {
+            readonly IObserver<Exception> errorSignal;
+
+            internal readonly HandlerObserver handlerObserver;
+
+            readonly IObservable<T> source;
+
+            IDisposable upstream;
+
+            internal IDisposable handlerUpstream;
+
+            int trampoline;
+
+            int halfSerializer;
+
+            Exception error;
+
+            internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream)
+            {
+                this.source = source;
+                this.errorSignal = errorSignal;
+                this.handlerObserver = new HandlerObserver(this);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref upstream);
+                    Disposable.TryDispose(ref handlerUpstream);
+                }
+                base.Dispose(disposing);
+            }
+
+            public void OnCompleted()
+            {
+                if (Disposable.TrySetSerial(ref upstream, null))
+                {
+                    errorSignal.OnNext(null);
+                }
+
+            }
+
+            public void OnError(Exception error)
+            {
+                HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error);
+            }
+
+            public void OnNext(T value)
+            {
+                HalfSerializer.ForwardOnNext(this, value, ref halfSerializer, ref this.error);
+            }
+
+            internal void HandlerError(Exception error)
+            {
+                HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error);
+            }
+
+            internal void HandlerComplete()
+            {
+                HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error);
+            }
+
+            internal void HandlerNext()
+            {
+                if (Interlocked.Increment(ref trampoline) == 1)
+                {
+                    do
+                    {
+                        var sad = new SingleAssignmentDisposable();
+                        if (Interlocked.CompareExchange(ref upstream, sad, null) != null)
+                        {
+                            return;
+                        }
+
+                        sad.Disposable = source.SubscribeSafe(this);
+                    }
+                    while (Interlocked.Decrement(ref trampoline) != 0);
+                }
+            }
+
+            internal sealed class HandlerObserver : IObserver<U>
+            {
+                readonly MainObserver main;
+
+                internal HandlerObserver(MainObserver main)
+                {
+                    this.main = main;
+                }
+
+                public void OnCompleted()
+                {
+                    main.HandlerComplete();
+                }
+
+                public void OnError(Exception error)
+                {
+                    main.HandlerError(error);
+                }
+
+                public void OnNext(U value)
+                {
+                    main.HandlerNext();
+                }
+            }
+        }
+
+    }
+}

+ 86 - 132
Rx.NET/Source/src/System.Reactive/Linq/Observable/RetryWhen.cs

@@ -1,4 +1,8 @@
-using System;
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
 using System.Collections.Concurrent;
 using System.Collections.Generic;
 using System.Reactive.Disposables;
@@ -44,19 +48,18 @@ namespace System.Reactive.Linq.ObservableImpl
                 return Disposable.Empty;
             }
 
-            var parent = new MainObserver(observer, source, new SerializedObserver(errorSignals));
+            var parent = new MainObserver(observer, source, new RedoSerializedObserver<Exception>(errorSignals));
 
             var d = redo.SubscribeSafe(parent.handlerObserver);
-            parent.handlerObserver.OnSubscribe(d);
+            Disposable.SetSingle(ref parent.handlerUpstream, d);
 
             parent.HandlerNext();
 
             return parent;
         }
 
-        sealed class MainObserver : IObserver<T>, IDisposable
+        sealed class MainObserver : Sink<T>, IObserver<T>
         {
-            readonly IObserver<T> downstream;
 
             readonly IObserver<Exception> errorSignal;
 
@@ -65,6 +68,7 @@ namespace System.Reactive.Linq.ObservableImpl
             readonly IObservable<T> source;
 
             IDisposable upstream;
+            internal IDisposable handlerUpstream;
 
             int trampoline;
 
@@ -72,85 +76,49 @@ namespace System.Reactive.Linq.ObservableImpl
 
             Exception error;
 
-            internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal)
+            internal MainObserver(IObserver<T> downstream, IObservable<T> source, IObserver<Exception> errorSignal) : base(downstream)
             {
-                this.downstream = downstream;
                 this.source = source;
                 this.errorSignal = errorSignal;
                 this.handlerObserver = new HandlerObserver(this);
             }
 
-            public void Dispose()
+            protected override void Dispose(bool disposing)
             {
-                Disposable.TryDispose(ref upstream);
-                handlerObserver.Dispose();
+                if (disposing)
+                {
+                    Disposable.TryDispose(ref upstream);
+                    Disposable.TryDispose(ref handlerUpstream); 
+                }
+                base.Dispose(disposing);
             }
 
             public void OnCompleted()
             {
-                if (Interlocked.Increment(ref halfSerializer) == 1)
-                {
-                    downstream.OnCompleted();
-                    Dispose();
-                }
+                HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error);
             }
 
             public void OnError(Exception error)
             {
-                for (; ; )
+                if (Disposable.TrySetSerial(ref upstream, null))
                 {
-                    var d = Volatile.Read(ref upstream);
-                    if (d == BooleanDisposable.True)
-                    {
-                        break;
-                    }
-                    if (Interlocked.CompareExchange(ref upstream, null, d) == d)
-                    {
-                        errorSignal.OnNext(error);
-                        d.Dispose();
-                        break;
-                    }
+                    errorSignal.OnNext(error);
                 }
             }
 
             public void OnNext(T value)
             {
-                if (Interlocked.CompareExchange(ref halfSerializer, 1, 0) == 0)
-                {
-                    downstream.OnNext(value);
-                    if (Interlocked.Decrement(ref halfSerializer) != 0)
-                    {
-                        var ex = error;
-                        if (ex == null)
-                        {
-                            downstream.OnCompleted();
-                        }
-                        else
-                        {
-                            downstream.OnError(ex);
-                        }
-                        Dispose();
-                    }
-                }
+                HalfSerializer.ForwardOnNext(this, value, ref halfSerializer, ref this.error);
             }
 
             internal void HandlerError(Exception error)
             {
-                this.error = error;
-                if (Interlocked.Increment(ref halfSerializer) == 1)
-                {
-                    downstream.OnError(error);
-                    Dispose();
-                }
+                HalfSerializer.ForwardOnError(this, error, ref halfSerializer, ref this.error);
             }
 
             internal void HandlerComplete()
             {
-                if (Interlocked.Increment(ref halfSerializer) == 1)
-                {
-                    downstream.OnCompleted();
-                    Dispose();
-                }
+                HalfSerializer.ForwardOnCompleted(this, ref halfSerializer, ref this.error);
             }
 
             internal void HandlerNext()
@@ -171,37 +139,23 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
             }
 
-            internal sealed class HandlerObserver : IObserver<U>, IDisposable
+            internal sealed class HandlerObserver : IObserver<U>
             {
                 readonly MainObserver main;
 
-                IDisposable upstream;
-
                 internal HandlerObserver(MainObserver main)
                 {
                     this.main = main;
                 }
 
-                internal void OnSubscribe(IDisposable d)
-                {
-                    Disposable.SetSingle(ref upstream, d);
-                }
-
-                public void Dispose()
-                {
-                    Disposable.TryDispose(ref upstream);
-                }
-
                 public void OnCompleted()
                 {
                     main.HandlerComplete();
-                    Dispose();
                 }
 
                 public void OnError(Exception error)
                 {
                     main.HandlerError(error);
-                    Dispose();
                 }
 
                 public void OnNext(U value)
@@ -210,97 +164,97 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
             }
         }
+    }
 
-        sealed class SerializedObserver : IObserver<Exception>
-        {
-            readonly IObserver<Exception> downstream;
-
-            int wip;
+    internal sealed class RedoSerializedObserver<X> : IObserver<X>
+    {
+        readonly IObserver<X> downstream;
 
-            Exception terminalException;
+        int wip;
 
-            static readonly Exception DONE = new Exception();
+        Exception terminalException;
 
-            static readonly Exception SIGNALED = new Exception();
+        static readonly Exception DONE = new Exception();
 
-            readonly ConcurrentQueue<Exception> queue;
+        static readonly Exception SIGNALED = new Exception();
 
-            internal SerializedObserver(IObserver<Exception> downstream)
-            {
-                this.downstream = downstream;
-                this.queue = new ConcurrentQueue<Exception>();
-            }
+        readonly ConcurrentQueue<X> queue;
 
-            public void OnCompleted()
-            {
-                if (Interlocked.CompareExchange(ref terminalException, DONE, null) == null)
-                {
-                    Drain();
-                }
-            }
+        internal RedoSerializedObserver(IObserver<X> downstream)
+        {
+            this.downstream = downstream;
+            this.queue = new ConcurrentQueue<X>();
+        }
 
-            public void OnError(Exception error)
+        public void OnCompleted()
+        {
+            if (Interlocked.CompareExchange(ref terminalException, DONE, null) == null)
             {
-                if (Interlocked.CompareExchange(ref terminalException, error, null) == null)
-                {
-                    Drain();
-                }
+                Drain();
             }
+        }
 
-            public void OnNext(Exception value)
+        public void OnError(Exception error)
+        {
+            if (Interlocked.CompareExchange(ref terminalException, error, null) == null)
             {
-                queue.Enqueue(value);
                 Drain();
             }
+        }
 
-            void Clear()
-            {
-                while (queue.TryDequeue(out var _)) ;
-            }
+        public void OnNext(X value)
+        {
+            queue.Enqueue(value);
+            Drain();
+        }
 
-            void Drain()
+        void Clear()
+        {
+            while (queue.TryDequeue(out var _)) ;
+        }
+
+        void Drain()
+        {
+            if (Interlocked.Increment(ref wip) != 1)
             {
-                if (Interlocked.Increment(ref wip) != 1)
-                {
-                    return;
-                }
+                return;
+            }
 
-                int missed = 1;
+            int missed = 1;
 
-                for (; ; )
+            for (; ; )
+            {
+                var ex = Volatile.Read(ref terminalException);
+                if (ex != null)
                 {
-                    var ex = Volatile.Read(ref terminalException);
-                    if (ex != null)
+                    if (ex != SIGNALED)
                     {
-                        if (ex != SIGNALED)
+                        Interlocked.Exchange(ref terminalException, SIGNALED);
+                        if (ex != DONE)
                         {
-                            Interlocked.Exchange(ref terminalException, SIGNALED);
-                            if (ex != DONE)
-                            {
-                                downstream.OnError(ex);
-                            }
-                            else
-                            {
-                                downstream.OnCompleted();
-                            }
+                            downstream.OnError(ex);
                         }
-                        Clear();
-                    }
-                    else
-                    {
-                        while (queue.TryDequeue(out var item))
+                        else
                         {
-                            downstream.OnNext(item);
+                            downstream.OnCompleted();
                         }
                     }
-                        
-
-                    missed = Interlocked.Add(ref wip, -missed);
-                    if (missed == 0)
+                    Clear();
+                }
+                else
+                {
+                    while (queue.TryDequeue(out var item))
                     {
-                        break;
+                        downstream.OnNext(item);
                     }
                 }
+
+
+                missed = Interlocked.Add(ref wip, -missed);
+                if (missed == 0)
+                {
+                    break;
+                }
             }
         }
     }

+ 36 - 1
Rx.NET/Source/src/System.Reactive/Linq/Qbservable.Generated.cs

@@ -1,4 +1,4 @@
-/*
+/*
  * WARNING: Auto-generated file (05/28/2018 22:20:18)
  * Run Rx's auto-homoiconizer tool to generate this file (in the HomoIcon directory).
  */
@@ -10847,6 +10847,41 @@ namespace System.Reactive.Linq
             );
         }
 
+        /// <summary>
+        /// Repeatedly resubscribes to the source observable after a normal completion and when the observable
+        /// returned by a handler produces an arbitrary item.
+        /// </summary>
+        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
+        /// <typeparam name="TSignal">The arbitrary element type signaled by the handler observable.</typeparam>
+        /// <param name="source">Observable sequence to keep repeating when it successfully terminates.</param>
+        /// <param name="handler">The function that is called for each observer and takes an observable sequence objects.
+        /// It should return an observable of arbitrary items that should signal that arbitrary item in
+        /// response to receiving the completion signal from the source observable. If this observable signals
+        /// a terminal event, the sequence is terminated with that signal instead.</param>
+        /// <returns>An observable sequence producing the elements of the given sequence repeatedly while each repetition terminates successfully.</returns>
+        /// <exception cref="ArgumentNullException"><paramref name="source"/> is null.</exception>
+        /// <exception cref="ArgumentNullException"><paramref name="handler"/> is null.</exception>
+        public static IQbservable<TSource> RepeatWhen<TSource, TSignal>(this IQbservable<TSource> source, Expression<Func<IObservable<object>, IObservable<TSignal>>> handler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (handler == null)
+                throw new ArgumentNullException(nameof(handler));
+
+            return source.Provider.CreateQuery<TSource>(
+                Expression.Call(
+                    null,
+#if CRIPPLED_REFLECTION
+                    InfoOf(() => Qbservable.RepeatWhen<TSource, TSignal>(default(IQbservable<TSource>), default(Expression<Func<IObservable<object>, IObservable<TSignal>>>))),
+#else
+                    ((MethodInfo)MethodInfo.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TSignal)),
+#endif
+                    source.Expression,
+                    handler
+                )
+            );
+        }
+
         /// <summary>
         /// Returns an observable sequence that is the result of invoking the selector on a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications.
         /// This operator is a specialization of Multicast using a <see cref="T:System.Reactive.Subjects.ReplaySubject`1" />.

+ 5 - 0
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Single.cs

@@ -172,6 +172,11 @@ namespace System.Reactive.Linq
             return Enumerable.Repeat(source, repeatCount).Concat();
         }
 
+        public virtual IObservable<TSource> RepeatWhen<TSource, TSignal>(IObservable<TSource> source, Func<IObservable<object>, IObservable<TSignal>> handler)
+        {
+            return new RepeatWhen<TSource, TSignal>(source, handler);
+        }
+
         #endregion
 
         #region - Retry -

+ 2 - 0
Rx.NET/Source/tests/Tests.System.Reactive.ApiApprovals/Api/ApiApprovalTests.Core.approved.txt

@@ -1252,6 +1252,7 @@ namespace System.Reactive.Linq
         public static System.IObservable<TResult> Repeat<TResult>(TResult value, int repeatCount, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TSource> Repeat<TSource>(this System.IObservable<TSource> source) { }
         public static System.IObservable<TSource> Repeat<TSource>(this System.IObservable<TSource> source, int repeatCount) { }
+        public static System.IObservable<TSource> RepeatWhen<TSource, TSignal>(this System.IObservable<TSource> source, System.Func<System.IObservable<object>, System.IObservable<TSignal>> handler) { }
         public static System.Reactive.Subjects.IConnectableObservable<TSource> Replay<TSource>(this System.IObservable<TSource> source) { }
         public static System.Reactive.Subjects.IConnectableObservable<TSource> Replay<TSource>(this System.IObservable<TSource> source, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.IObservable<TResult> Replay<TSource, TResult>(this System.IObservable<TSource> source, System.Func<System.IObservable<TSource>, System.IObservable<TResult>> selector) { }
@@ -1968,6 +1969,7 @@ namespace System.Reactive.Linq
         public static System.Reactive.Linq.IQbservable<TResult> Repeat<TResult>(this System.Reactive.Linq.IQbservableProvider provider, TResult value, System.Reactive.Concurrency.IScheduler scheduler) { }
         public static System.Reactive.Linq.IQbservable<TSource> Repeat<TSource>(this System.Reactive.Linq.IQbservable<TSource> source) { }
         public static System.Reactive.Linq.IQbservable<TSource> Repeat<TSource>(this System.Reactive.Linq.IQbservable<TSource> source, int repeatCount) { }
+        public static System.Reactive.Linq.IQbservable<TSource> RepeatWhen<TSource, TSignal>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<object>, System.IObservable<TSignal>>> handler) { }
         public static System.Reactive.Linq.IQbservable<TResult> Replay<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector) { }
         public static System.Reactive.Linq.IQbservable<TResult> Replay<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector, int bufferSize) { }
         public static System.Reactive.Linq.IQbservable<TResult> Replay<TSource, TResult>(this System.Reactive.Linq.IQbservable<TSource> source, System.Linq.Expressions.Expression<System.Func<System.IObservable<TSource>, System.IObservable<TResult>>> selector, int bufferSize, System.Reactive.Concurrency.IScheduler scheduler) { }

+ 542 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/RepeatWhenTest.cs

@@ -0,0 +1,542 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information. 
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+using System.Reactive;
+using System.Reactive.Concurrency;
+using System.Reactive.Linq;
+using Microsoft.Reactive.Testing;
+using Xunit;
+using ReactiveTests.Dummies;
+using System.Reflection;
+using System.Threading;
+using System.Reactive.Disposables;
+using System.Reactive.Subjects;
+using System.Runtime.CompilerServices;
+
+namespace ReactiveTests.Tests
+{
+    public class RepeatWhenTest : ReactiveTest
+    {
+
+        [Fact]
+        public void RepeatWhen_ArgumentChecking()
+        {
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(null, v => v));
+            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.RepeatWhen<int, object>(Observable.Return(1), null));
+            ReactiveAssert.Throws<ArgumentNullException>(() => DummyObservable<int>.Instance.RepeatWhen(v => v).Subscribe(null));
+        }
+
+        [Fact]
+        public void RepeatWhen_Handler_Crash()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnCompleted<int>(10)
+            );
+
+            var ex = new InvalidOperationException();
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen<int, object>(v => { throw ex; })
+            );
+
+            res.Messages.AssertEqual(
+                OnError<int>(200, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_Handler_Error()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnCompleted<int>(10)
+            );
+
+            var ex = new InvalidOperationException();
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen<int, object>(v => v.Select<object, object>(w => throw ex))
+            );
+
+            res.Messages.AssertEqual(
+                OnError<int>(210, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 210)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_Handler_Completed()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnCompleted<int>(10)
+            );
+
+            var ex = new InvalidOperationException();
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen<int, object>(v => v.Take(1).Skip(1))
+            );
+
+            res.Messages.AssertEqual(
+                OnCompleted<int>(210)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 210)
+            );
+        }
+        
+        [Fact]
+        public void RepeatWhen_Disposed()
+        {
+            var main = new Subject<int>();
+            var inner = new Subject<int>();
+
+            var d = main.RepeatWhen(v => inner).Subscribe();
+
+            Assert.True(main.HasObservers);
+            Assert.True(inner.HasObservers);
+
+            d.Dispose();
+
+            Assert.False(main.HasObservers);
+            Assert.False(inner.HasObservers);
+        }
+
+        [Fact]
+        public void RepeatWhen_Handler_Completed_Disposes_Main()
+        {
+            var main = new Subject<int>();
+            var inner = new Subject<int>();
+
+            var end = 0;
+            var items = 0;
+            var errors = 0;
+
+            main.RepeatWhen(v => inner).Subscribe(
+                onNext: v => items++, 
+                onError: e => errors++, 
+                onCompleted: () => end++);
+
+            Assert.True(main.HasObservers);
+            Assert.True(inner.HasObservers);
+
+            inner.OnCompleted();
+
+            Assert.False(main.HasObservers);
+            Assert.False(inner.HasObservers);
+
+            Assert.Equal(0, items);
+            Assert.Equal(0, errors);
+            Assert.Equal(1, end);
+        }
+
+        [Fact]
+        public void RepeatWhen_Handler_Error_Disposes_Main()
+        {
+            var main = new Subject<int>();
+            var inner = new Subject<int>();
+
+            var end = 0;
+            var items = 0;
+            var errors = 0;
+
+            main.RepeatWhen(v => inner).Subscribe(
+                onNext: v => items++,
+                onError: e => errors++,
+                onCompleted: () => end++);
+
+            Assert.True(main.HasObservers);
+            Assert.True(inner.HasObservers);
+
+            inner.OnError(new InvalidOperationException());
+
+            Assert.False(main.HasObservers);
+            Assert.False(inner.HasObservers);
+
+            Assert.Equal(0, items);
+            Assert.Equal(1, errors);
+            Assert.Equal(0, end);
+        }
+
+        [Fact]
+        public void RepeatWhen_Basic()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(100, 1),
+                OnNext(150, 2),
+                OnNext(200, 3),
+                OnCompleted<int>(250)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v => v)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(300, 1),
+                OnNext(350, 2),
+                OnNext(400, 3),
+                OnNext(550, 1),
+                OnNext(600, 2),
+                OnNext(650, 3),
+                OnNext(800, 1),
+                OnNext(850, 2),
+                OnNext(900, 3)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 450),
+                Subscribe(450, 700),
+                Subscribe(700, 950),
+                Subscribe(950, 1000)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_Infinite()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(100, 1),
+                OnNext(150, 2),
+                OnNext(200, 3)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v => v)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(300, 1),
+                OnNext(350, 2),
+                OnNext(400, 3)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 1000)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_Error()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new Exception();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(100, 1),
+                OnNext(150, 2),
+                OnNext(200, 3),
+                OnError<int>(250, ex)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v => v)
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(300, 1),
+                OnNext(350, 2),
+                OnNext(400, 3),
+                OnError<int>(450, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 450)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_Throws()
+        {
+            var scheduler1 = new TestScheduler();
+
+            var xs = Observable.Return(1, scheduler1).RepeatWhen(v => v);
+
+            xs.Subscribe(x => { throw new InvalidOperationException(); });
+
+            ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
+
+            var scheduler2 = new TestScheduler();
+
+            var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v => v);
+
+            ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
+
+            ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
+
+            var scheduler3 = new TestScheduler();
+
+            var zs = Observable.Return(1, scheduler3).RepeatWhen(v => v);
+
+            var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
+
+            scheduler3.ScheduleAbsolute(210, () => d.Dispose());
+
+            scheduler3.Start();
+
+            var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v => v);
+
+            ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
+        }
+
+        [Fact]
+        public void RepeatWhen_RepeatCount_Basic()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(5, 1),
+                OnNext(10, 2),
+                OnNext(15, 3),
+                OnCompleted<int>(20)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v =>
+                {
+                    var count = 0;
+                    return v.TakeWhile(w => ++count < 3);
+                })
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(205, 1),
+                OnNext(210, 2),
+                OnNext(215, 3),
+                OnNext(225, 1),
+                OnNext(230, 2),
+                OnNext(235, 3),
+                OnNext(245, 1),
+                OnNext(250, 2),
+                OnNext(255, 3),
+                OnCompleted<int>(260)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 220),
+                Subscribe(220, 240),
+                Subscribe(240, 260)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_RepeatCount_Dispose()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(5, 1),
+                OnNext(10, 2),
+                OnNext(15, 3),
+                OnCompleted<int>(20)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v =>
+                {
+                    var count = 0;
+                    return v.TakeWhile(w => ++count < 3);
+                }), 231
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(205, 1),
+                OnNext(210, 2),
+                OnNext(215, 3),
+                OnNext(225, 1),
+                OnNext(230, 2)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 220),
+                Subscribe(220, 231)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_RepeatCount_Infinite()
+        {
+            var scheduler = new TestScheduler();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(100, 1),
+                OnNext(150, 2),
+                OnNext(200, 3)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v =>
+                {
+                    var count = 0;
+                    return v.TakeWhile(w => ++count < 3);
+                })
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(300, 1),
+                OnNext(350, 2),
+                OnNext(400, 3)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 1000)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_RepeatCount_Error()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new Exception();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(100, 1),
+                OnNext(150, 2),
+                OnNext(200, 3),
+                OnError<int>(250, ex)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v =>
+                {
+                    var count = 0;
+                    return v.TakeWhile(w => ++count < 3);
+                })
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(300, 1),
+                OnNext(350, 2),
+                OnNext(400, 3),
+                OnError<int>(450, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 450)
+            );
+        }
+
+        [Fact]
+        public void RepeatWhen_RepeatCount_Throws()
+        {
+            var scheduler1 = new TestScheduler();
+
+            var xs = Observable.Return(1, scheduler1).RepeatWhen(v =>
+            {
+                var count = 0;
+                return v.TakeWhile(w => ++count < 3);
+            });
+
+            xs.Subscribe(x => { throw new InvalidOperationException(); });
+
+            ReactiveAssert.Throws<InvalidOperationException>(() => scheduler1.Start());
+
+            var scheduler2 = new TestScheduler();
+
+            var ys = Observable.Throw<int>(new Exception(), scheduler2).RepeatWhen(v =>
+            {
+                var count = 0;
+                return v.TakeWhile(w => ++count < 3);
+            });
+
+            ys.Subscribe(x => { }, ex => { throw new InvalidOperationException(); });
+
+            ReactiveAssert.Throws<InvalidOperationException>(() => scheduler2.Start());
+
+            var scheduler3 = new TestScheduler();
+
+            var zs = Observable.Return(1, scheduler3).RepeatWhen(v =>
+            {
+                var count = 0;
+                return v.TakeWhile(w => ++count < 100);
+            });
+
+            var d = zs.Subscribe(x => { }, ex => { }, () => { throw new InvalidOperationException(); });
+
+            scheduler3.ScheduleAbsolute(10, () => d.Dispose());
+
+            scheduler3.Start();
+
+            var xss = Observable.Create<int>(new Func<IObserver<int>, Action>(o => { throw new InvalidOperationException(); })).RepeatWhen(v =>
+            {
+                var count = 0;
+                return v.TakeWhile(w => ++count < 3);
+            });
+
+            ReactiveAssert.Throws<InvalidOperationException>(() => xss.Subscribe());
+        }
+
+        [Fact]
+        public void RepeatWhen_Observable_Repeat_Delayed()
+        {
+            var scheduler = new TestScheduler();
+
+            var ex = new Exception();
+
+            var xs = scheduler.CreateColdObservable(
+                OnNext(5, 1),
+                OnNext(10, 2),
+                OnNext(15, 3),
+                OnCompleted<int>(20)
+            );
+
+            var res = scheduler.Start(() =>
+                xs.RepeatWhen(v =>
+                {
+                    int[] count = { 0 };
+                    return v.SelectMany(w => {
+                        int c = ++count[0];
+                        if (c == 3)
+                        {
+                            return Observable.Throw<int>(ex);
+                        }
+                        return Observable.Return(1).Delay(TimeSpan.FromTicks(c * 100), scheduler);
+                    });
+                })
+            );
+
+            res.Messages.AssertEqual(
+                OnNext(205, 1),
+                OnNext(210, 2),
+                OnNext(215, 3),
+                OnNext(325, 1),
+                OnNext(330, 2),
+                OnNext(335, 3),
+                OnNext(545, 1),
+                OnNext(550, 2),
+                OnNext(555, 3),
+                OnError<int>(560, ex)
+            );
+
+            xs.Subscriptions.AssertEqual(
+                Subscribe(200, 220),
+                Subscribe(320, 340),
+                Subscribe(540, 560)
+            );
+        }
+    }
+}