1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324 |
- // Licensed to the .NET Foundation under one or more agreements.
- // The .NET Foundation licenses this file to you under the Apache 2.0 License.
- // See the LICENSE file in the project root for more information.
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Reactive.Concurrency;
- using System.Reactive.Linq;
- using Microsoft.Reactive.Testing;
- using ReactiveTests.Dummies;
- using Xunit;
- namespace ReactiveTests.Tests
- {
- public class BufferTest : ReactiveTest
- {
- #region + Boundary +
- [Fact]
- public void Buffer_ArgumentChecking()
- {
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyFunc<IObservable<int>>.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(Func<IObservable<int>>)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default, DummyFunc<int, IObservable<int>>.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(IObservable<int>)));
- }
- [Fact]
- public void Buffer_Closings_Basic()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var window = 1;
- var res = scheduler.Start(() =>
- xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
- OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
- OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
- OnCompleted<IList<int>>(590)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 590)
- );
- }
- [Fact]
- public void Buffer_Closings_InnerSubscriptions()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var closings = new ITestableObservable<bool>[] {
- scheduler.CreateHotObservable(
- OnNext(300, true),
- OnNext(350, false),
- OnCompleted<bool>(380)
- ),
- scheduler.CreateHotObservable(
- OnNext(400, true),
- OnNext(510, false),
- OnNext(620, false)
- ),
- scheduler.CreateHotObservable(
- OnCompleted<bool>(500)
- ),
- scheduler.CreateHotObservable(
- OnNext(600, true)
- )
- };
- var window = 0;
- var res = scheduler.Start(() =>
- xs.Buffer(() => closings[window++])
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),
- OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { 5, 6 })),
- OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
- OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
- OnCompleted<IList<int>>(590)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 590)
- );
- closings[0].Subscriptions.AssertEqual(
- Subscribe(200, 300)
- );
- closings[1].Subscriptions.AssertEqual(
- Subscribe(300, 400)
- );
- closings[2].Subscriptions.AssertEqual(
- Subscribe(400, 500)
- );
- closings[3].Subscriptions.AssertEqual(
- Subscribe(500, 590)
- );
- }
- [Fact]
- public void Buffer_Closings_Empty()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var window = 1;
- var res = scheduler.Start(() =>
- xs.Buffer(() => Observable.Empty<int>().Delay(TimeSpan.FromTicks((window++) * 100), scheduler))
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
- OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
- OnNext<IList<int>>(590, l => l.SequenceEqual(new int[] { 10 })),
- OnCompleted<IList<int>>(590)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 590)
- );
- }
- [Fact]
- public void Buffer_Closings_Dispose()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var window = 1;
- var res = scheduler.Start(() =>
- xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)),
- 400
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 }))
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 400)
- );
- }
- [Fact]
- public void Buffer_Closings_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnError<int>(590, ex)
- );
- var window = 1;
- var res = scheduler.Start(() =>
- xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),
- OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),
- OnError<IList<int>>(590, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 590)
- );
- }
- [Fact]
- public void Buffer_Closings_Throw()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnError<int>(590, new Exception())
- );
- var res = scheduler.Start(() =>
- xs.Buffer<int, int>(() => { throw ex; })
- );
- res.Messages.AssertEqual(
- OnError<IList<int>>(200, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 200)
- );
- }
- [Fact]
- public void Buffer_Closings_WindowClose_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnError<int>(590, new Exception())
- );
- var res = scheduler.Start(() =>
- xs.Buffer(() => Observable.Throw<int>(ex, scheduler))
- );
- res.Messages.AssertEqual(
- OnError<IList<int>>(201, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 201)
- );
- }
- [Fact]
- public void Buffer_OpeningClosings_Basic()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var ys = scheduler.CreateHotObservable(
- OnNext(255, 50),
- OnNext(330, 100),
- OnNext(350, 50),
- OnNext(400, 90),
- OnCompleted<int>(900)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler))
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(305, b => b.SequenceEqual(new int[] { 4 })),
- OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
- OnNext<IList<int>>(430, b => b.SequenceEqual(new int[] { 6, 7, 8 })),
- OnNext<IList<int>>(490, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
- OnCompleted<IList<int>>(900)
- );
- #if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 590)
- );
- #else
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 900)
- );
- #endif
- ys.Subscriptions.AssertEqual(
- Subscribe(200, 900)
- );
- }
- [Fact]
- public void Buffer_Boundaries_Simple()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var ys = scheduler.CreateHotObservable(
- OnNext(255, true),
- OnNext(330, true),
- OnNext(350, true),
- OnNext(400, true),
- OnNext(500, true),
- OnCompleted<bool>(900)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(ys)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
- OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
- OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
- OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
- OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),
- OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),
- OnCompleted<IList<int>>(590)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 590)
- );
- ys.Subscriptions.AssertEqual(
- Subscribe(200, 590)
- );
- }
- [Fact]
- public void Buffer_Boundaries_OnCompletedBoundaries()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var ys = scheduler.CreateHotObservable(
- OnNext(255, true),
- OnNext(330, true),
- OnNext(350, true),
- OnCompleted<bool>(400)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(ys)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
- OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
- OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
- OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),
- OnCompleted<IList<int>>(400)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 400)
- );
- ys.Subscriptions.AssertEqual(
- Subscribe(200, 400)
- );
- }
- [Fact]
- public void Buffer_Boundaries_OnErrorSource()
- {
- var ex = new Exception();
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(380, 7),
- OnError<int>(400, ex)
- );
- var ys = scheduler.CreateHotObservable(
- OnNext(255, true),
- OnNext(330, true),
- OnNext(350, true),
- OnCompleted<bool>(500)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(ys)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
- OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
- OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
- OnError<IList<int>>(400, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 400)
- );
- ys.Subscriptions.AssertEqual(
- Subscribe(200, 400)
- );
- }
- [Fact]
- public void Buffer_Boundaries_OnErrorBoundaries()
- {
- var ex = new Exception();
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(90, 1),
- OnNext(180, 2),
- OnNext(250, 3),
- OnNext(260, 4),
- OnNext(310, 5),
- OnNext(340, 6),
- OnNext(410, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnNext(550, 10),
- OnCompleted<int>(590)
- );
- var ys = scheduler.CreateHotObservable(
- OnNext(255, true),
- OnNext(330, true),
- OnNext(350, true),
- OnError<bool>(400, ex)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(ys)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),
- OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),
- OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),
- OnError<IList<int>>(400, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 400)
- );
- ys.Subscriptions.AssertEqual(
- Subscribe(200, 400)
- );
- }
- #endregion
- #region + Count +
- [Fact]
- public void Buffer_Single_ArgumentChecking()
- {
- var someObservable = Observable.Empty<int>();
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, -1));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 1, 0));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0, 1));
- }
- [Fact]
- public void Buffer_Count_PartialWindow()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(150, 1),
- OnNext(210, 2),
- OnNext(220, 3),
- OnNext(230, 4),
- OnNext(240, 5),
- OnCompleted<int>(250)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(5)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })),
- OnCompleted<IList<int>>(250)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 250)
- );
- }
- [Fact]
- public void Buffer_Count_FullWindows()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(150, 1),
- OnNext(210, 2),
- OnNext(220, 3),
- OnNext(230, 4),
- OnNext(240, 5),
- OnCompleted<int>(250)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(2)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(220, l => l.SequenceEqual(new[] { 2, 3 })),
- OnNext<IList<int>>(240, l => l.SequenceEqual(new[] { 4, 5 })),
- OnCompleted<IList<int>>(250)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 250)
- );
- }
- [Fact]
- public void Buffer_Count_FullAndPartialWindows()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(150, 1),
- OnNext(210, 2),
- OnNext(220, 3),
- OnNext(230, 4),
- OnNext(240, 5),
- OnCompleted<int>(250)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(3)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
- OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
- OnCompleted<IList<int>>(250)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 250)
- );
- }
- [Fact]
- public void Buffer_Count_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(150, 1),
- OnNext(210, 2),
- OnNext(220, 3),
- OnNext(230, 4),
- OnNext(240, 5),
- OnError<int>(250, ex)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(5)
- );
- res.Messages.AssertEqual(
- OnError<IList<int>>(250, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 250)
- );
- }
- [Fact]
- public void Buffer_Count_Skip_Less()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(150, 1),
- OnNext(210, 2),
- OnNext(220, 3),
- OnNext(230, 4),
- OnNext(240, 5),
- OnCompleted<int>(250)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(3, 1)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),
- OnNext<IList<int>>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })),
- OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 4, 5 })),
- OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
- OnCompleted<IList<int>>(250)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 250)
- );
- }
- [Fact]
- public void Buffer_Count_Skip_More()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(150, 1),
- OnNext(210, 2),
- OnNext(220, 3),
- OnNext(230, 4),
- OnNext(240, 5),
- OnCompleted<int>(250)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(2, 3)
- );
- res.Messages.AssertEqual(
- OnNext<IList<int>>(220, l => l.SequenceEqual(new int[] { 2, 3 })),
- OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),
- OnCompleted<IList<int>>(250)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 250)
- );
- }
- [Fact]
- public void BufferWithCount_ArgumentChecking()
- {
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0, 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 1, 0));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0));
- }
- [Fact]
- public void BufferWithCount_Basic()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(280, "2,3,4"),
- OnNext(350, "4,5,6"),
- OnNext(420, "6,7,8"),
- OnNext(600, "8,9"),
- OnCompleted<string>(600)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithCount_Disposed()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370
- );
- res.Messages.AssertEqual(
- OnNext(280, "2,3,4"),
- OnNext(350, "4,5,6")
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 370)
- );
- }
- [Fact]
- public void BufferWithCount_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnError<int>(600, ex)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(280, "2,3,4"),
- OnNext(350, "4,5,6"),
- OnNext(420, "6,7,8"),
- OnError<string>(600, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithCount_Default()
- {
- Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6);
- Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5);
- }
- #endregion
- #region + Time +
- [Fact]
- public void Buffer_Time_ArgumentChecking()
- {
- var scheduler = new TestScheduler();
- var someObservable = Observable.Empty<int>();
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, scheduler));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, TimeSpan.Zero, null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero, scheduler));
- }
- [Fact]
- public void BufferWithTime_ArgumentChecking()
- {
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1)));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));
- }
- [Fact]
- public void BufferWithTime_Basic1()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(300, "2,3,4"),
- OnNext(370, "4,5,6"),
- OnNext(440, "6,7,8"),
- OnNext(510, "8,9"),
- OnNext(580, ""),
- OnNext(600, ""),
- OnCompleted<string>(600)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithTime_Basic2()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(270, "2,3"),
- OnNext(370, "5,6"),
- OnNext(470, "8,9"),
- OnNext(570, ""),
- OnCompleted<string>(600)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithTime_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnError<int>(600, ex)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(300, "2,3,4"),
- OnNext(370, "4,5,6"),
- OnNext(440, "6,7,8"),
- OnNext(510, "8,9"),
- OnNext(580, ""),
- OnError<string>(600, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithTime_Disposed()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
- 370
- );
- res.Messages.AssertEqual(
- OnNext(300, "2,3,4")
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 370)
- );
- }
- [Fact]
- public void BufferWithTime_Basic_Same()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(300, "2,3,4"),
- OnNext(400, "5,6,7"),
- OnNext(500, "8,9"),
- OnNext(600, ""),
- OnCompleted<string>(600)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithTime_Basic_Same_Periodic()
- {
- var scheduler = new PeriodicTestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(300, "2,3,4"),
- OnNext(400, "5,6,7"),
- OnNext(500, "8,9"),
- OnNext(600, ""),
- OnCompleted<string>(600)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- #if !WINDOWS
- scheduler.Timers.AssertEqual(
- new TimerRun(200, 600) { 300, 400, 500 }
- );
- #endif
- }
- [Fact]
- public void BufferWithTime_Basic_Same_Periodic_Error()
- {
- var ex = new Exception();
- var scheduler = new PeriodicTestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(100, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(380, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnError<int>(480, ex)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(300, "2,3,4"),
- OnNext(400, "5,6,7"),
- OnError<string>(480, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 480)
- );
- #if !WINDOWS
- scheduler.Timers.AssertEqual(
- new TimerRun(200, 480) { 300, 400 }
- );
- #endif
- }
- [Fact]
- public void BufferWithTime_Default()
- {
- Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
- Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));
- }
- [Fact]
- public void BufferWithTimeOrCount_ArgumentChecking()
- {
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));
- ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));
- ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));
- }
- [Fact]
- public void BufferWithTimeOrCount_Basic()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(205, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(370, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(240, "1,2,3"),
- OnNext(310, "4"),
- OnNext(370, "5,6,7"),
- OnNext(440, "8"),
- OnNext(510, "9"),
- OnNext(580, ""),
- OnNext(600, ""),
- OnCompleted<string>(600)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithTimeOrCount_Error()
- {
- var scheduler = new TestScheduler();
- var ex = new Exception();
- var xs = scheduler.CreateHotObservable(
- OnNext(205, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(370, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnError<int>(600, ex)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))
- );
- res.Messages.AssertEqual(
- OnNext(240, "1,2,3"),
- OnNext(310, "4"),
- OnNext(370, "5,6,7"),
- OnNext(440, "8"),
- OnNext(510, "9"),
- OnNext(580, ""),
- OnError<string>(600, ex)
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 600)
- );
- }
- [Fact]
- public void BufferWithTimeOrCount_Disposed()
- {
- var scheduler = new TestScheduler();
- var xs = scheduler.CreateHotObservable(
- OnNext(205, 1),
- OnNext(210, 2),
- OnNext(240, 3),
- OnNext(280, 4),
- OnNext(320, 5),
- OnNext(350, 6),
- OnNext(370, 7),
- OnNext(420, 8),
- OnNext(470, 9),
- OnCompleted<int>(600)
- );
- var res = scheduler.Start(() =>
- xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),
- 370
- );
- res.Messages.AssertEqual(
- OnNext(240, "1,2,3"),
- OnNext(310, "4"),
- OnNext(370, "5,6,7")
- );
- xs.Subscriptions.AssertEqual(
- Subscribe(200, 370)
- );
- }
- [Fact]
- public void BufferWithTimeOrCount_Default()
- {
- Observable.Range(1, 10, DefaultScheduler.Instance).Buffer(TimeSpan.FromDays(1), 3).Skip(1).First().AssertEqual(4, 5, 6);
- }
- [Fact]
- public void BufferWithTime_TickWhileOnCompleted()
- {
- var scheduler = new TestScheduler();
- Observable.Return(1)
- .Buffer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(2), scheduler)
- .Subscribe(v =>
- {
- scheduler.AdvanceBy(TimeSpan.FromMilliseconds(1).Ticks);
- });
- }
- #endregion
- }
- }
|