1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System;
- using System.Linq;
- using System.Reactive.Concurrency;
- using System.Reactive.Linq;
- using System.Reactive.Subjects;
- using Microsoft.Reactive.Testing;
- using ReactiveTests.Dummies;
- using Xunit;
- namespace ReactiveTests.Tests
- {
- public class ReplayTest : ReactiveTest
- {
- [Fact]
- public void Replay_ArgumentChecking()
- {
- var someObservable = Observable.Empty<int>();
- var scheduler = new TestScheduler();
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int>(null, DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(DummyObservable<int>.Instance, null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(null, DummyFunc<IObservable<int>, IObservable<int>>.Instance, DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(DummyObservable<int>.Instance, null, DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(DummyObservable<int>.Instance, DummyFunc<IObservable<int>, IObservable<int>>.Instance, null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(-1)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(-1), scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, TimeSpan.FromSeconds(1), default));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(-1), scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, TimeSpan.FromSeconds(1), default));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, default(IScheduler)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, -2, scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, default(IScheduler)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), 1, TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, -2, TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(-1), scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, 1, TimeSpan.FromSeconds(1), null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(default(IObservable<int>), x => x, 1, TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay<int, int>(someObservable, null, 1, TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, -2, TimeSpan.FromSeconds(1), scheduler));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(-1), scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Replay(someObservable, x => x, 1, TimeSpan.FromSeconds(1), null));
- }
- [Fact]
- public void ReplayCount_Basic()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(550, () => connection.Dispose());
- scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 5),
- OnNext(452, 6),
- OnNext(453, 7),
- OnNext(521, 11)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 550),
- Subscribe(650, 800)
- );
- }
- [Fact]
- public void ReplayCount_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnError<int>(600, ex)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 5),
- OnNext(452, 6),
- OnNext(453, 7),
- OnNext(521, 11),
- OnNext(561, 20),
- OnError<int>(601, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 600)
- );
- }
- [Fact]
- public void ReplayCount_Complete()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 5),
- OnNext(452, 6),
- OnNext(453, 7),
- OnNext(521, 11),
- OnNext(561, 20),
- OnCompleted<int>(601)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 600)
- );
- }
- [Fact]
- public void ReplayCount_Dispose()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(3, scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(550, () => connection.Dispose());
- scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 5),
- OnNext(452, 6),
- OnNext(453, 7)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 550),
- Subscribe(650, 800)
- );
- }
- [Fact]
- public void ReplayCount_MultipleConnections()
- {
- var xs = Observable.Never<int>();
- var ys = xs.Replay(3, new TestScheduler());
- var connection1 = ys.Connect();
- var connection2 = ys.Connect();
- Assert.Same(connection1, connection2);
- connection1.Dispose();
- connection2.Dispose();
- var connection3 = ys.Connect();
- Assert.NotSame(connection1, connection3);
- connection3.Dispose();
- }
- [Fact]
- public void ReplayCountLambda_Zip_Complete()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
- 610
- );
- res.Messages.AssertEqual(
- OnNext(221, 3),
- OnNext(281, 4),
- OnNext(291, 1),
- OnNext(341, 8),
- OnNext(361, 5),
- OnNext(371, 6),
- OnNext(372, 8),
- OnNext(373, 5),
- OnNext(374, 6),
- OnNext(391, 7),
- OnNext(411, 13),
- OnNext(431, 2),
- OnNext(432, 7),
- OnNext(433, 13),
- OnNext(434, 2),
- OnNext(451, 9),
- OnNext(521, 11),
- OnNext(561, 20),
- OnNext(562, 9),
- OnNext(563, 11),
- OnNext(564, 20),
- OnNext(602, 9),
- OnNext(603, 11),
- OnNext(604, 20),
- OnNext(606, 9),
- OnNext(607, 11),
- OnNext(608, 20)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void ReplayCountLambda_Zip_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnError<int>(600, ex)
- );
- var res = scheduler.Start(() =>
- xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler)
- );
- res.Messages.AssertEqual(
- OnNext(221, 3),
- OnNext(281, 4),
- OnNext(291, 1),
- OnNext(341, 8),
- OnNext(361, 5),
- OnNext(371, 6),
- OnNext(372, 8),
- OnNext(373, 5),
- OnNext(374, 6),
- OnNext(391, 7),
- OnNext(411, 13),
- OnNext(431, 2),
- OnNext(432, 7),
- OnNext(433, 13),
- OnNext(434, 2),
- OnNext(451, 9),
- OnNext(521, 11),
- OnNext(561, 20),
- OnNext(562, 9),
- OnNext(563, 11),
- OnNext(564, 20),
- OnError<int>(601, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void ReplayCountLambda_Zip_Dispose()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Replay(_xs => _xs.Take(6).Repeat(), 3, scheduler),
- 470
- );
- res.Messages.AssertEqual(
- OnNext(221, 3),
- OnNext(281, 4),
- OnNext(291, 1),
- OnNext(341, 8),
- OnNext(361, 5),
- OnNext(371, 6),
- OnNext(372, 8),
- OnNext(373, 5),
- OnNext(374, 6),
- OnNext(391, 7),
- OnNext(411, 13),
- OnNext(431, 2),
- OnNext(432, 7),
- OnNext(433, 13),
- OnNext(434, 2),
- OnNext(451, 9)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 470)
- );
- }
- [Fact]
- public void ReplayTime_Basic()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(150), scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(550, () => connection.Dispose());
- scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 8),
- OnNext(452, 5),
- OnNext(453, 6),
- OnNext(454, 7),
- OnNext(521, 11)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 550),
- Subscribe(650, 800)
- );
- }
- [Fact]
- public void ReplayTime_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnError<int>(600, ex)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(75), scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 7),
- OnNext(521, 11),
- OnNext(561, 20),
- OnError<int>(601, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 600)
- );
- }
- [Fact]
- public void ReplayTime_Complete()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(85), scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(Disposed, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 6),
- OnNext(452, 7),
- OnNext(521, 11),
- OnNext(561, 20),
- OnCompleted<int>(601)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 600)
- );
- }
- [Fact]
- public void ReplayTime_Dispose()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var ys = default(IConnectableObservable<int>);
- var subscription = default(IDisposable);
- var connection = default(IDisposable);
- var res = scheduler.CreateObserver<int>();
- scheduler.ScheduleAbsolute(Created, () => ys = xs.Replay(TimeSpan.FromTicks(100), scheduler));
- scheduler.ScheduleAbsolute(450, () => subscription = ys.Subscribe(res));
- scheduler.ScheduleAbsolute(475, () => subscription.Dispose());
- scheduler.ScheduleAbsolute(300, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(400, () => connection.Dispose());
- scheduler.ScheduleAbsolute(500, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(550, () => connection.Dispose());
- scheduler.ScheduleAbsolute(650, () => connection = ys.Connect());
- scheduler.ScheduleAbsolute(800, () => connection.Dispose());
- scheduler.Start();
- res.Messages.AssertEqual(
- OnNext(451, 5),
- OnNext(452, 6),
- OnNext(453, 7)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(300, 400),
- Subscribe(500, 550),
- Subscribe(650, 800)
- );
- }
- [Fact]
- public void ReplayTime_MultipleConnections()
- {
- var xs = Observable.Never<int>();
- var ys = xs.Replay(TimeSpan.FromTicks(100), new TestScheduler());
- var connection1 = ys.Connect();
- var connection2 = ys.Connect();
- Assert.Same(connection1, connection2);
- connection1.Dispose();
- connection2.Dispose();
- var connection3 = ys.Connect();
- Assert.NotSame(connection1, connection3);
- connection3.Dispose();
- }
- [Fact]
- public void ReplayTimeLambda_Zip_Complete()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
- 610
- );
- res.Messages.AssertEqual(
- OnNext(221, 3),
- OnNext(281, 4),
- OnNext(291, 1),
- OnNext(341, 8),
- OnNext(361, 5),
- OnNext(371, 6),
- OnNext(372, 8),
- OnNext(373, 5),
- OnNext(374, 6),
- OnNext(391, 7),
- OnNext(411, 13),
- OnNext(431, 2),
- OnNext(432, 7),
- OnNext(433, 13),
- OnNext(434, 2),
- OnNext(451, 9),
- OnNext(521, 11),
- OnNext(561, 20),
- OnNext(562, 11),
- OnNext(563, 20),
- OnNext(602, 20),
- OnNext(604, 20),
- OnNext(606, 20),
- OnNext(608, 20)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void ReplayTimeLambda_Zip_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnError<int>(600, ex)
- );
- var res = scheduler.Start(() =>
- xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler)
- );
- res.Messages.AssertEqual(
- OnNext(221, 3),
- OnNext(281, 4),
- OnNext(291, 1),
- OnNext(341, 8),
- OnNext(361, 5),
- OnNext(371, 6),
- OnNext(372, 8),
- OnNext(373, 5),
- OnNext(374, 6),
- OnNext(391, 7),
- OnNext(411, 13),
- OnNext(431, 2),
- OnNext(432, 7),
- OnNext(433, 13),
- OnNext(434, 2),
- OnNext(451, 9),
- OnNext(521, 11),
- OnNext(561, 20),
- OnNext(562, 11),
- OnNext(563, 20),
- OnError<int>(601, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void ReplayTimeLambda_Zip_Dispose()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(110, 7),
- OnNext(220, 3),
- OnNext(280, 4),
- OnNext(290, 1),
- OnNext(340, 8),
- OnNext(360, 5),
- OnNext(370, 6),
- OnNext(390, 7),
- OnNext(410, 13),
- OnNext(430, 2),
- OnNext(450, 9),
- OnNext(520, 11),
- OnNext(560, 20),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Replay(_xs => _xs.Take(6).Repeat(), TimeSpan.FromTicks(50), scheduler),
- 470
- );
- res.Messages.AssertEqual(
- OnNext(221, 3),
- OnNext(281, 4),
- OnNext(291, 1),
- OnNext(341, 8),
- OnNext(361, 5),
- OnNext(371, 6),
- OnNext(372, 8),
- OnNext(373, 5),
- OnNext(374, 6),
- OnNext(391, 7),
- OnNext(411, 13),
- OnNext(431, 2),
- OnNext(432, 7),
- OnNext(433, 13),
- OnNext(434, 2),
- OnNext(451, 9)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 470)
- );
- }
- [Fact]
- public void Replay_Default1()
- {
- var s = new Subject<int>();
- var xs = s.Replay(100, DefaultScheduler.Instance);
- var ys = s.Replay(100);
- xs.Connect();
- ys.Connect();
- s.OnNext(1);
- s.OnNext(2);
- s.OnCompleted();
- xs.AssertEqual(ys);
- }
- [Fact]
- public void Replay_Default2()
- {
- var s = new Subject<int>();
- var xs = s.Replay(TimeSpan.FromHours(1), DefaultScheduler.Instance);
- var ys = s.Replay(TimeSpan.FromHours(1));
- xs.Connect();
- ys.Connect();
- s.OnNext(1);
- s.OnNext(2);
- s.OnCompleted();
- xs.AssertEqual(ys);
- }
- [Fact]
- public void Replay_Default3()
- {
- var s = new Subject<int>();
- var xs = s.Replay(100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
- var ys = s.Replay(100, TimeSpan.FromHours(1));
- xs.Connect();
- ys.Connect();
- s.OnNext(1);
- s.OnNext(2);
- s.OnCompleted();
- xs.AssertEqual(ys);
- }
- [Fact]
- public void Replay_Default4()
- {
- var s = new Subject<int>();
- var xs = s.Replay(DefaultScheduler.Instance);
- var ys = s.Replay();
- xs.Connect();
- ys.Connect();
- s.OnNext(1);
- s.OnNext(2);
- s.OnCompleted();
- xs.AssertEqual(ys);
- }
- [Fact]
- public void ReplayLambda_Default1()
- {
- var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, DefaultScheduler.Instance);
- var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100);
- xs.AssertEqual(ys);
- }
- [Fact]
- public void ReplayLambda_Default2()
- {
- var xs = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1), DefaultScheduler.Instance);
- var ys = Observable.Range(1, 10).Replay(_xs => _xs, TimeSpan.FromHours(1));
- xs.AssertEqual(ys);
- }
- [Fact]
- public void ReplayLambda_Default3()
- {
- var xs = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1), DefaultScheduler.Instance);
- var ys = Observable.Range(1, 10).Replay(_xs => _xs, 100, TimeSpan.FromHours(1));
- xs.AssertEqual(ys);
- }
- [Fact]
- public void ReplayLambda_Default4()
- {
- var xs = Observable.Range(1, 10).Replay(_xs => _xs, DefaultScheduler.Instance);
- var ys = Observable.Range(1, 10).Replay(_xs => _xs);
- xs.AssertEqual(ys);
- }
- }
- }
|