// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive; using System.Reactive.Concurrency; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; using Microsoft.VisualStudio.TestTools.UnitTesting; using Assert = Xunit.Assert; namespace ReactiveTests.Tests { [TestClass] public partial class ReplaySubjectTest : ReactiveTest { [TestMethod] public void Subscribe_ArgumentChecking() { ReactiveAssert.Throws(() => new ReplaySubject().Subscribe(null)); ReactiveAssert.Throws(() => new ReplaySubject(1).Subscribe(null)); ReactiveAssert.Throws(() => new ReplaySubject(2).Subscribe(null)); ReactiveAssert.Throws(() => new ReplaySubject(DummyScheduler.Instance).Subscribe(null)); } [TestMethod] public void OnError_ArgumentChecking() { ReactiveAssert.Throws(() => new ReplaySubject().OnError(null)); ReactiveAssert.Throws(() => new ReplaySubject(1).OnError(null)); ReactiveAssert.Throws(() => new ReplaySubject(2).OnError(null)); ReactiveAssert.Throws(() => new ReplaySubject(DummyScheduler.Instance).OnError(null)); } [TestMethod] public void Constructor_ArgumentChecking() { #pragma warning disable CA1806 // (Unused new instance.) We are just testing whether or not the constructor throws. ReactiveAssert.Throws(() => new ReplaySubject(-1)); ReactiveAssert.Throws(() => new ReplaySubject(-1, DummyScheduler.Instance)); ReactiveAssert.Throws(() => new ReplaySubject(-1, TimeSpan.Zero)); ReactiveAssert.Throws(() => new ReplaySubject(-1, TimeSpan.Zero, DummyScheduler.Instance)); ReactiveAssert.Throws(() => new ReplaySubject(TimeSpan.FromTicks(-1))); ReactiveAssert.Throws(() => new ReplaySubject(TimeSpan.FromTicks(-1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => new ReplaySubject(0, TimeSpan.FromTicks(-1))); ReactiveAssert.Throws(() => new ReplaySubject(0, TimeSpan.FromTicks(-1), DummyScheduler.Instance)); ReactiveAssert.Throws(() => new ReplaySubject(null)); ReactiveAssert.Throws(() => new ReplaySubject(0, null)); ReactiveAssert.Throws(() => new ReplaySubject(TimeSpan.Zero, null)); ReactiveAssert.Throws(() => new ReplaySubject(0, TimeSpan.Zero, null)); // zero allowed new ReplaySubject(0); new ReplaySubject(TimeSpan.Zero); new ReplaySubject(0, TimeSpan.Zero); new ReplaySubject(0, DummyScheduler.Instance); new ReplaySubject(TimeSpan.Zero, DummyScheduler.Instance); new ReplaySubject(0, TimeSpan.Zero, DummyScheduler.Instance); #pragma warning restore CA1806 } [TestMethod] public void Infinite_ReplayByTime() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8), OnNext(710, 9), OnNext(870, 10), OnNext(940, 11), OnNext(1020, 12) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(301, 3), OnNext(302, 4), OnNext(341, 5), OnNext(411, 6), OnNext(521, 7) ); results2.Messages.AssertEqual( OnNext(401, 5), OnNext(411, 6), OnNext(521, 7), OnNext(631, 8) ); results3.Messages.AssertEqual( OnNext(901, 10), OnNext(941, 11) ); } [TestMethod] public void Infinite_ReplayOne() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8), OnNext(710, 9), OnNext(870, 10), OnNext(940, 11), OnNext(1020, 12) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); var results4 = scheduler.CreateObserver(); var subscription4 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1200, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(1100, () => subscription4 = subject.Subscribe(results4)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8) ); results3.Messages.AssertEqual( OnNext(900, 10), OnNext(940, 11) ); results4.Messages.AssertEqual( OnNext(1100, 12) ); } [TestMethod] public void Infinite_ReplayMany() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8), OnNext(710, 9), OnNext(870, 10), OnNext(940, 11), OnNext(1020, 12) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 3), OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 3), OnNext(400, 4), OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8) ); results3.Messages.AssertEqual( OnNext(900, 8), OnNext(900, 9), OnNext(900, 10), OnNext(940, 11) ); } [TestMethod] public void Infinite_ReplayAll() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8), OnNext(710, 9), OnNext(870, 10), OnNext(940, 11), OnNext(1020, 12) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 3), OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 3), OnNext(400, 4), OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8) ); results3.Messages.AssertEqual( OnNext(900, 3), OnNext(900, 4), OnNext(900, 5), OnNext(900, 6), OnNext(900, 7), OnNext(900, 8), OnNext(900, 9), OnNext(900, 10), OnNext(940, 11) ); } [TestMethod] public void Infinite2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(280, -1), OnNext(290, -2), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnNext(630, 8), OnNext(710, 9), OnNext(870, 10), OnNext(940, 11), OnNext(1020, 12) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(301, 4), OnNext(302, -1), OnNext(303, -2), OnNext(341, 5), OnNext(411, 6), OnNext(521, 7) ); results2.Messages.AssertEqual( OnNext(401, 5), OnNext(411, 6), OnNext(521, 7), OnNext(631, 8) ); results3.Messages.AssertEqual( OnNext(901, 10), OnNext(941, 11) ); } [TestMethod] public void Finite_ReplayByTime() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(301, 3), OnNext(302, 4), OnNext(341, 5), OnNext(411, 6), OnNext(521, 7) ); results2.Messages.AssertEqual( OnNext(401, 5), OnNext(411, 6), OnNext(521, 7), OnCompleted(631) ); results3.Messages.AssertEqual( OnCompleted(901) ); } [TestMethod] public void Finite_ReplayOne() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630) ); results3.Messages.AssertEqual( OnNext(900, 7), OnCompleted(900) ); } [TestMethod] public void Finite_ReplayMany() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 3), OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 3), OnNext(400, 4), OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630) ); results3.Messages.AssertEqual( OnNext(900, 5), OnNext(900, 6), OnNext(900, 7), OnCompleted(900) ); } [TestMethod] public void Finite_ReplayAll() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 3), OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 3), OnNext(400, 4), OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(630) ); results3.Messages.AssertEqual( OnNext(900, 3), OnNext(900, 4), OnNext(900, 5), OnNext(900, 6), OnNext(900, 7), OnCompleted(900) ); } [TestMethod] public void Error_ReplayByTime() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(301, 3), OnNext(302, 4), OnNext(341, 5), OnNext(411, 6), OnNext(521, 7) ); results2.Messages.AssertEqual( OnNext(401, 5), OnNext(411, 6), OnNext(521, 7), OnError(631, ex) ); results3.Messages.AssertEqual( OnError(901, ex) ); } [TestMethod] public void Error_ReplayOne() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex) ); results3.Messages.AssertEqual( OnNext(900, 7), OnError(900, ex) ); } [TestMethod] public void Error_ReplayMany() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 3), OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 3), OnNext(400, 4), OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex) ); results3.Messages.AssertEqual( OnNext(900, 5), OnNext(900, 6), OnNext(900, 7), OnError(900, ex) ); } [TestMethod] public void Error_ReplayAll() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( OnNext(300, 3), OnNext(300, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7) ); results2.Messages.AssertEqual( OnNext(400, 3), OnNext(400, 4), OnNext(400, 5), OnNext(410, 6), OnNext(520, 7), OnError(630, ex) ); results3.Messages.AssertEqual( OnNext(900, 3), OnNext(900, 4), OnNext(900, 5), OnNext(900, 6), OnNext(900, 7), OnError(900, ex) ); } [TestMethod] public void Canceled_ReplayByTime() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3, TimeSpan.FromTicks(100), scheduler)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( OnCompleted(631) ); results3.Messages.AssertEqual( OnCompleted(901) ); } [TestMethod] public void Canceled_ReplayOne() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( OnCompleted(630) ); results3.Messages.AssertEqual( OnCompleted(900) ); } [TestMethod] public void Canceled_ReplayMany() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( OnCompleted(630) ); results3.Messages.AssertEqual( OnCompleted(900) ); } [TestMethod] public void Canceled_ReplayAll() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnCompleted(630), OnNext(640, 9), OnCompleted(650), OnError(660, new Exception()) ); var subject = default(ReplaySubject); var subscription = default(IDisposable); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); scheduler.ScheduleAbsolute(200, () => subscription = xs.Subscribe(subject)); scheduler.ScheduleAbsolute(1000, () => subscription.Dispose()); scheduler.ScheduleAbsolute(300, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(900, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(600, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(950, () => subscription3.Dispose()); scheduler.Start(); results1.Messages.AssertEqual( ); results2.Messages.AssertEqual( OnCompleted(630) ); results3.Messages.AssertEqual( OnCompleted(900) ); } [TestMethod] public void SubjectDisposed() { var scheduler = new TestScheduler(); var subject = default(ReplaySubject); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(scheduler)); scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(600, () => subject.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); scheduler.Start(); results1.Messages.AssertEqual( OnNext(201, 1), OnNext(251, 2), OnNext(351, 3), OnNext(451, 4) ); results2.Messages.AssertEqual( OnNext(301, 1), OnNext(302, 2), OnNext(351, 3), OnNext(451, 4), OnNext(551, 5) ); results3.Messages.AssertEqual( OnNext(401, 1), OnNext(402, 2), OnNext(403, 3), OnNext(451, 4), OnNext(551, 5) ); } [TestMethod] public void SubjectDisposed_ReplayOne() { var scheduler = new TestScheduler(); var subject = default(ReplaySubject); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(1)); scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(600, () => subject.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); scheduler.Start(); results1.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4) ); results2.Messages.AssertEqual( OnNext(300, 2), OnNext(350, 3), OnNext(450, 4), OnNext(550, 5) ); results3.Messages.AssertEqual( OnNext(400, 3), OnNext(450, 4), OnNext(550, 5) ); } [TestMethod] public void SubjectDisposed_ReplayMany() { var scheduler = new TestScheduler(); var subject = default(ReplaySubject); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(3)); scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(600, () => subject.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); scheduler.Start(); results1.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4) ); results2.Messages.AssertEqual( OnNext(300, 1), OnNext(300, 2), OnNext(350, 3), OnNext(450, 4), OnNext(550, 5) ); results3.Messages.AssertEqual( OnNext(400, 1), OnNext(400, 2), OnNext(400, 3), OnNext(450, 4), OnNext(550, 5) ); } [TestMethod] public void SubjectDisposed_ReplayAll() { var scheduler = new TestScheduler(); var subject = default(ReplaySubject); var results1 = scheduler.CreateObserver(); var subscription1 = default(IDisposable); var results2 = scheduler.CreateObserver(); var subscription2 = default(IDisposable); var results3 = scheduler.CreateObserver(); var subscription3 = default(IDisposable); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject()); scheduler.ScheduleAbsolute(200, () => subscription1 = subject.Subscribe(results1)); scheduler.ScheduleAbsolute(300, () => subscription2 = subject.Subscribe(results2)); scheduler.ScheduleAbsolute(400, () => subscription3 = subject.Subscribe(results3)); scheduler.ScheduleAbsolute(500, () => subscription1.Dispose()); scheduler.ScheduleAbsolute(600, () => subject.Dispose()); scheduler.ScheduleAbsolute(700, () => subscription2.Dispose()); scheduler.ScheduleAbsolute(800, () => subscription3.Dispose()); scheduler.ScheduleAbsolute(150, () => subject.OnNext(1)); scheduler.ScheduleAbsolute(250, () => subject.OnNext(2)); scheduler.ScheduleAbsolute(350, () => subject.OnNext(3)); scheduler.ScheduleAbsolute(450, () => subject.OnNext(4)); scheduler.ScheduleAbsolute(550, () => subject.OnNext(5)); scheduler.ScheduleAbsolute(650, () => ReactiveAssert.Throws(() => subject.OnNext(6))); scheduler.ScheduleAbsolute(750, () => ReactiveAssert.Throws(() => subject.OnCompleted())); scheduler.ScheduleAbsolute(850, () => ReactiveAssert.Throws(() => subject.OnError(new Exception()))); scheduler.ScheduleAbsolute(950, () => ReactiveAssert.Throws(() => subject.Subscribe())); scheduler.Start(); results1.Messages.AssertEqual( OnNext(200, 1), OnNext(250, 2), OnNext(350, 3), OnNext(450, 4) ); results2.Messages.AssertEqual( OnNext(300, 1), OnNext(300, 2), OnNext(350, 3), OnNext(450, 4), OnNext(550, 5) ); results3.Messages.AssertEqual( OnNext(400, 1), OnNext(400, 2), OnNext(400, 3), OnNext(450, 4), OnNext(550, 5) ); } // // TODO: Create a failing test for this for the other implementations (ReplayOne/Many/All). // I don't understand the behavior. // I think it may have to do with calling Trim() on Subscription (as well as in the OnNext calls). -LC // [TestMethod] public void ReplaySubjectDiesOut() { // // Tests v1.x behavior as documented in ReplaySubject.cs (Subscribe method). // var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(70, 1), OnNext(110, 2), OnNext(220, 3), OnNext(270, 4), OnNext(340, 5), OnNext(410, 6), OnNext(520, 7), OnCompleted(580) ); var subject = default(ReplaySubject); var results1 = scheduler.CreateObserver(); var results2 = scheduler.CreateObserver(); var results3 = scheduler.CreateObserver(); var results4 = scheduler.CreateObserver(); scheduler.ScheduleAbsolute(100, () => subject = new ReplaySubject(int.MaxValue, TimeSpan.FromTicks(100), scheduler)); scheduler.ScheduleAbsolute(200, () => xs.Subscribe(subject)); scheduler.ScheduleAbsolute(300, () => subject.Subscribe(results1)); scheduler.ScheduleAbsolute(400, () => subject.Subscribe(results2)); scheduler.ScheduleAbsolute(600, () => subject.Subscribe(results3)); scheduler.ScheduleAbsolute(900, () => subject.Subscribe(results4)); scheduler.Start(); results1.Messages.AssertEqual( OnNext(301, 3), OnNext(302, 4), OnNext(341, 5), OnNext(411, 6), OnNext(521, 7), OnCompleted(581) ); results2.Messages.AssertEqual( OnNext(401, 5), OnNext(411, 6), OnNext(521, 7), OnCompleted(581) ); results3.Messages.AssertEqual( OnNext(601, 7), OnCompleted(602) ); results4.Messages.AssertEqual( OnCompleted(901) ); } [TestMethod] public void HasObservers() { HasObserversImpl(new ReplaySubject()); HasObserversImpl(new ReplaySubject(1)); HasObserversImpl(new ReplaySubject(3)); HasObserversImpl(new ReplaySubject(TimeSpan.FromSeconds(1))); } private static void HasObserversImpl(ReplaySubject s) { Assert.False(s.HasObservers); var d1 = s.Subscribe(_ => { }); Assert.True(s.HasObservers); d1.Dispose(); Assert.False(s.HasObservers); var d2 = s.Subscribe(_ => { }); Assert.True(s.HasObservers); var d3 = s.Subscribe(_ => { }); Assert.True(s.HasObservers); d2.Dispose(); Assert.True(s.HasObservers); d3.Dispose(); Assert.False(s.HasObservers); } [TestMethod] public void HasObservers_Dispose1() { HasObservers_Dispose1Impl(new ReplaySubject()); HasObservers_Dispose1Impl(new ReplaySubject(1)); HasObservers_Dispose1Impl(new ReplaySubject(3)); HasObservers_Dispose1Impl(new ReplaySubject(TimeSpan.FromSeconds(1))); } private static void HasObservers_Dispose1Impl(ReplaySubject s) { Assert.False(s.HasObservers); Assert.False(s.IsDisposed); var d = s.Subscribe(_ => { }); Assert.True(s.HasObservers); Assert.False(s.IsDisposed); s.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); d.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); } [TestMethod] public void HasObservers_Dispose2() { HasObservers_Dispose2Impl(new ReplaySubject()); HasObservers_Dispose2Impl(new ReplaySubject(1)); HasObservers_Dispose2Impl(new ReplaySubject(3)); HasObservers_Dispose2Impl(new ReplaySubject(TimeSpan.FromSeconds(1))); } private static void HasObservers_Dispose2Impl(ReplaySubject s) { Assert.False(s.HasObservers); Assert.False(s.IsDisposed); var d = s.Subscribe(_ => { }); Assert.True(s.HasObservers); Assert.False(s.IsDisposed); d.Dispose(); Assert.False(s.HasObservers); Assert.False(s.IsDisposed); s.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); } [TestMethod] public void HasObservers_Dispose3() { HasObservers_Dispose3Impl(new ReplaySubject()); HasObservers_Dispose3Impl(new ReplaySubject(1)); HasObservers_Dispose3Impl(new ReplaySubject(3)); HasObservers_Dispose3Impl(new ReplaySubject(TimeSpan.FromSeconds(1))); } private static void HasObservers_Dispose3Impl(ReplaySubject s) { Assert.False(s.HasObservers); Assert.False(s.IsDisposed); s.Dispose(); Assert.False(s.HasObservers); Assert.True(s.IsDisposed); } [TestMethod] public void HasObservers_OnCompleted() { HasObservers_OnCompletedImpl(new ReplaySubject()); HasObservers_OnCompletedImpl(new ReplaySubject(1)); HasObservers_OnCompletedImpl(new ReplaySubject(3)); HasObservers_OnCompletedImpl(new ReplaySubject(TimeSpan.FromSeconds(1))); } private static void HasObservers_OnCompletedImpl(ReplaySubject s) { Assert.False(s.HasObservers); var d = s.Subscribe(_ => { }); Assert.True(s.HasObservers); s.OnNext(42); Assert.True(s.HasObservers); s.OnCompleted(); Assert.False(s.HasObservers); } [TestMethod] public void HasObservers_OnError() { HasObservers_OnErrorImpl(new ReplaySubject()); HasObservers_OnErrorImpl(new ReplaySubject(1)); HasObservers_OnErrorImpl(new ReplaySubject(3)); HasObservers_OnErrorImpl(new ReplaySubject(TimeSpan.FromSeconds(1))); } private static void HasObservers_OnErrorImpl(ReplaySubject s) { Assert.False(s.HasObservers); var d = s.Subscribe(_ => { }, ex => { }); Assert.True(s.HasObservers); s.OnNext(42); Assert.True(s.HasObservers); s.OnError(new Exception()); Assert.False(s.HasObservers); } [TestMethod] public void Completed_to_late_subscriber_ReplayAll() { var s = new ReplaySubject(); s.OnNext(1); s.OnNext(2); s.OnCompleted(); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(3, observer.Messages.Count); Assert.Equal(1, observer.Messages[0].Value.Value); Assert.Equal(2, observer.Messages[1].Value.Value); Assert.Equal(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind); } [TestMethod] public void Completed_to_late_subscriber_ReplayOne() { var s = new ReplaySubject(1); s.OnNext(1); s.OnNext(2); s.OnCompleted(); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(2, observer.Messages.Count); Assert.Equal(2, observer.Messages[0].Value.Value); Assert.Equal(NotificationKind.OnCompleted, observer.Messages[1].Value.Kind); } [TestMethod] public void Completed_to_late_subscriber_ReplayMany() { var s = new ReplaySubject(2); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnCompleted(); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(3, observer.Messages.Count); Assert.Equal(2, observer.Messages[0].Value.Value); Assert.Equal(3, observer.Messages[1].Value.Value); Assert.Equal(NotificationKind.OnCompleted, observer.Messages[2].Value.Kind); } [TestMethod] public void Completed_to_late_subscriber_ReplayByTime() { var s = new ReplaySubject(TimeSpan.FromMinutes(1)); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnCompleted(); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(4, observer.Messages.Count); Assert.Equal(1, observer.Messages[0].Value.Value); Assert.Equal(2, observer.Messages[1].Value.Value); Assert.Equal(3, observer.Messages[2].Value.Value); Assert.Equal(NotificationKind.OnCompleted, observer.Messages[3].Value.Kind); } [TestMethod] public void Errored_to_late_subscriber_ReplayAll() { var expectedException = new Exception("Test"); var s = new ReplaySubject(); s.OnNext(1); s.OnNext(2); s.OnError(expectedException); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(3, observer.Messages.Count); Assert.Equal(1, observer.Messages[0].Value.Value); Assert.Equal(2, observer.Messages[1].Value.Value); Assert.Equal(NotificationKind.OnError, observer.Messages[2].Value.Kind); Assert.Equal(expectedException, observer.Messages[2].Value.Exception); } [TestMethod] public void Errored_to_late_subscriber_ReplayOne() { var expectedException = new Exception("Test"); var s = new ReplaySubject(1); s.OnNext(1); s.OnNext(2); s.OnError(expectedException); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(2, observer.Messages.Count); Assert.Equal(2, observer.Messages[0].Value.Value); Assert.Equal(NotificationKind.OnError, observer.Messages[1].Value.Kind); Assert.Equal(expectedException, observer.Messages[1].Value.Exception); } [TestMethod] public void Errored_to_late_subscriber_ReplayMany() { var expectedException = new Exception("Test"); var s = new ReplaySubject(2); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnError(expectedException); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(3, observer.Messages.Count); Assert.Equal(2, observer.Messages[0].Value.Value); Assert.Equal(3, observer.Messages[1].Value.Value); Assert.Equal(NotificationKind.OnError, observer.Messages[2].Value.Kind); Assert.Equal(expectedException, observer.Messages[2].Value.Exception); } [TestMethod] public void Errored_to_late_subscriber_ReplayByTime() { var expectedException = new Exception("Test"); var s = new ReplaySubject(TimeSpan.FromMinutes(1)); s.OnNext(1); s.OnNext(2); s.OnNext(3); s.OnError(expectedException); var scheduler = new TestScheduler(); var observer = scheduler.CreateObserver(); s.Subscribe(observer); Assert.Equal(4, observer.Messages.Count); Assert.Equal(1, observer.Messages[0].Value.Value); Assert.Equal(2, observer.Messages[1].Value.Value); Assert.Equal(3, observer.Messages[2].Value.Value); Assert.Equal(NotificationKind.OnError, observer.Messages[3].Value.Kind); Assert.Equal(expectedException, observer.Messages[3].Value.Exception); } [TestMethod] public void ReplaySubject_Reentrant() { var r = new ReplaySubject(4); r.OnNext(0); r.OnNext(1); r.OnNext(2); r.OnNext(3); r.OnNext(4); var xs = new List(); var i = 0; r.Subscribe(x => { xs.Add(x); if (++i <= 10) { r.OnNext(x); } }); r.OnNext(5); Assert.True(xs.SequenceEqual( [ 1, 2, 3, 4, // original 1, 2, 3, 4, // reentrant (+ fed back) 1, 2, 3, 4, // reentrant (+ first two fed back) 1, 2, // reentrant 5 // tune in ])); } #if !NO_INTERNALSTEST [TestMethod] public void FastImmediateObserver_Simple1() { var res = FastImmediateObserverTest(fio => { fio.OnNext(1); fio.OnNext(2); fio.OnNext(3); fio.OnCompleted(); fio.EnsureActive(4); }); res.AssertEqual( OnNext(0, 1), OnNext(1, 2), OnNext(2, 3), OnCompleted(3) ); } [TestMethod] public void FastImmediateObserver_Simple2() { var ex = new Exception(); var res = FastImmediateObserverTest(fio => { fio.OnNext(1); fio.OnNext(2); fio.OnNext(3); fio.OnError(ex); fio.EnsureActive(4); }); res.AssertEqual( OnNext(0, 1), OnNext(1, 2), OnNext(2, 3), OnError(3, ex) ); } [TestMethod] public void FastImmediateObserver_Simple3() { var res = FastImmediateObserverTest(fio => { fio.OnNext(1); fio.EnsureActive(); fio.OnNext(2); fio.EnsureActive(); fio.OnNext(3); fio.EnsureActive(); fio.OnCompleted(); fio.EnsureActive(); }); res.AssertEqual( OnNext(0, 1), OnNext(1, 2), OnNext(2, 3), OnCompleted(3) ); } [TestMethod] public void FastImmediateObserver_Fault() { var xs = new List(); var o = Observer.Create( x => { xs.Add(x); if (x == 2) { throw new Exception(); } }, ex => { }, () => { } ); var fio = new FastImmediateObserver(o); fio.OnNext(1); fio.OnNext(2); fio.OnNext(3); ReactiveAssert.Throws(() => fio.EnsureActive()); fio.OnNext(4); fio.EnsureActive(); fio.OnNext(2); fio.EnsureActive(); Assert.True(xs.Count == 2); } [TestMethod] public void FastImmediateObserver_Ownership1() { var xs = new List(); var o = Observer.Create( xs.Add, ex => { }, () => { } ); var fio = new FastImmediateObserver(o); var ts = new Task[16]; var N = 100; for (var i = 0; i < ts.Length; i++) { var j = i; ts[i] = Task.Factory.StartNew(() => { for (var k = 0; k < N; k++) { fio.OnNext(j * N + k); } fio.EnsureActive(N); }); } Task.WaitAll(ts); Assert.True(xs.Count == ts.Length * N); } [TestMethod] public void FastImmediateObserver_Ownership2() { var cd = new CountdownEvent(3); var w = new ManualResetEvent(false); var e = new ManualResetEvent(false); var xs = new List(); var o = Observer.Create( x => { xs.Add(x); w.Set(); e.WaitOne(); cd.Signal(); }, ex => { }, () => { } ); var fio = new FastImmediateObserver(o); fio.OnNext(1); var t = Task.Factory.StartNew(() => { fio.EnsureActive(); }); w.WaitOne(); fio.OnNext(2); fio.OnNext(3); fio.EnsureActive(2); e.Set(); cd.Wait(); Assert.True(xs.Count == 3); } private IEnumerable>> FastImmediateObserverTest(Action> f) { var ns = new List>>(); var l = 0L; var o = Observer.Create( x => { ns.Add(OnNext(l++, x)); }, ex => { ns.Add(OnError(l++, ex)); }, () => { ns.Add(OnCompleted(l++)); } ); var fio = new FastImmediateObserver(o); f(fio); return ns; } #endif } }