| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324 | // Licensed to the .NET Foundation under one or more agreements.// The .NET Foundation licenses this file to you under the MIT License.// See the LICENSE file in the project root for more information. using System;using System.Collections.Generic;using System.Linq;using System.Reactive.Concurrency;using System.Reactive.Linq;using Microsoft.Reactive.Testing;using ReactiveTests.Dummies;using Xunit;namespace ReactiveTests.Tests{    public class BufferTest : ReactiveTest    {        #region + Boundary +        [Fact]        public void Buffer_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyFunc<IObservable<int>>.Instance));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(Func<IObservable<int>>)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance, DummyFunc<int, IObservable<int>>.Instance));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default, DummyFunc<int, IObservable<int>>.Instance));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, DummyObservable<int>.Instance, default(Func<int, IObservable<int>>)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), DummyObservable<int>.Instance));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, default(IObservable<int>)));        }        [Fact]        public void Buffer_Closings_Basic()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var window = 1;            var res = scheduler.Start(() =>                xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))            );            res.Messages.AssertEqual(                OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),                OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),                OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),                OnCompleted<IList<int>>(590)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 590)            );        }        [Fact]        public void Buffer_Closings_InnerSubscriptions()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var closings = new ITestableObservable<bool>[] {                scheduler.CreateHotObservable(                    OnNext(300, true),                    OnNext(350, false),                    OnCompleted<bool>(380)                ),                scheduler.CreateHotObservable(                    OnNext(400, true),                    OnNext(510, false),                    OnNext(620, false)                ),                scheduler.CreateHotObservable(                    OnCompleted<bool>(500)                ),                scheduler.CreateHotObservable(                    OnNext(600, true)                )            };            var window = 0;            var res = scheduler.Start(() =>                xs.Buffer(() => closings[window++])            );            res.Messages.AssertEqual(                OnNext<IList<int>>(300, b => b.SequenceEqual(new int[] { 3, 4 })),                OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { 5, 6 })),                OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),                OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),                OnCompleted<IList<int>>(590)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 590)            );            closings[0].Subscriptions.AssertEqual(                Subscribe(200, 300)            );            closings[1].Subscriptions.AssertEqual(                Subscribe(300, 400)            );            closings[2].Subscriptions.AssertEqual(                Subscribe(400, 500)            );            closings[3].Subscriptions.AssertEqual(                Subscribe(500, 590)            );        }        [Fact]        public void Buffer_Closings_Empty()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var window = 1;            var res = scheduler.Start(() =>                xs.Buffer(() => Observable.Empty<int>().Delay(TimeSpan.FromTicks((window++) * 100), scheduler))            );            res.Messages.AssertEqual(                OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),                OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),                OnNext<IList<int>>(590, l => l.SequenceEqual(new int[] { 10 })),                OnCompleted<IList<int>>(590)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 590)            );        }        [Fact]        public void Buffer_Closings_Dispose()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var window = 1;            var res = scheduler.Start(() =>                xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler)),                400            );            res.Messages.AssertEqual(                OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 }))            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 400)            );        }        [Fact]        public void Buffer_Closings_Error()        {            var scheduler = new TestScheduler();            var ex = new Exception();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnError<int>(590, ex)            );            var window = 1;            var res = scheduler.Start(() =>                xs.Buffer(() => Observable.Timer(TimeSpan.FromTicks((window++) * 100), scheduler))            );            res.Messages.AssertEqual(                OnNext<IList<int>>(300, l => l.SequenceEqual(new int[] { 3, 4 })),                OnNext<IList<int>>(500, l => l.SequenceEqual(new int[] { 5, 6, 7, 8, 9 })),                OnError<IList<int>>(590, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 590)            );        }        [Fact]        public void Buffer_Closings_Throw()        {            var scheduler = new TestScheduler();            var ex = new Exception();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnError<int>(590, new Exception())            );            var res = scheduler.Start(() =>                xs.Buffer<int, int>(() => { throw ex; })            );            res.Messages.AssertEqual(                OnError<IList<int>>(200, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 200)            );        }        [Fact]        public void Buffer_Closings_WindowClose_Error()        {            var scheduler = new TestScheduler();            var ex = new Exception();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnError<int>(590, new Exception())            );            var res = scheduler.Start(() =>                xs.Buffer(() => Observable.Throw<int>(ex, scheduler))            );            res.Messages.AssertEqual(                OnError<IList<int>>(201, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 201)            );        }        [Fact]        public void Buffer_OpeningClosings_Basic()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var ys = scheduler.CreateHotObservable(                OnNext(255, 50),                OnNext(330, 100),                OnNext(350, 50),                OnNext(400, 90),                OnCompleted<int>(900)            );            var res = scheduler.Start(() =>                xs.Buffer(ys, x => Observable.Timer(TimeSpan.FromTicks(x), scheduler))            );            res.Messages.AssertEqual(                OnNext<IList<int>>(305, b => b.SequenceEqual(new int[] { 4 })),                OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),                OnNext<IList<int>>(430, b => b.SequenceEqual(new int[] { 6, 7, 8 })),                OnNext<IList<int>>(490, b => b.SequenceEqual(new int[] { 7, 8, 9 })),                OnCompleted<IList<int>>(900)            );#if !NO_PERF // BREAKING CHANGE v2 > v1.x -> More aggressive disposal behavior            xs.Subscriptions.AssertEqual(                Subscribe(200, 590)            );#else            xs.Subscriptions.AssertEqual(                Subscribe(200, 900)            );#endif            ys.Subscriptions.AssertEqual(                Subscribe(200, 900)            );        }        [Fact]        public void Buffer_Boundaries_Simple()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var ys = scheduler.CreateHotObservable(                OnNext(255, true),                OnNext(330, true),                OnNext(350, true),                OnNext(400, true),                OnNext(500, true),                OnCompleted<bool>(900)            );            var res = scheduler.Start(() =>                xs.Buffer(ys)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),                OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),                OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),                OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),                OnNext<IList<int>>(500, b => b.SequenceEqual(new int[] { 7, 8, 9 })),                OnNext<IList<int>>(590, b => b.SequenceEqual(new int[] { 10 })),                OnCompleted<IList<int>>(590)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 590)            );            ys.Subscriptions.AssertEqual(                Subscribe(200, 590)            );        }        [Fact]        public void Buffer_Boundaries_OnCompletedBoundaries()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var ys = scheduler.CreateHotObservable(                OnNext(255, true),                OnNext(330, true),                OnNext(350, true),                OnCompleted<bool>(400)            );            var res = scheduler.Start(() =>                xs.Buffer(ys)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),                OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),                OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),                OnNext<IList<int>>(400, b => b.SequenceEqual(new int[] { })),                OnCompleted<IList<int>>(400)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 400)            );            ys.Subscriptions.AssertEqual(                Subscribe(200, 400)            );        }        [Fact]        public void Buffer_Boundaries_OnErrorSource()        {            var ex = new Exception();            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(380, 7),                OnError<int>(400, ex)            );            var ys = scheduler.CreateHotObservable(                OnNext(255, true),                OnNext(330, true),                OnNext(350, true),                OnCompleted<bool>(500)            );            var res = scheduler.Start(() =>                xs.Buffer(ys)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),                OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),                OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),                OnError<IList<int>>(400, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 400)            );            ys.Subscriptions.AssertEqual(                Subscribe(200, 400)            );        }        [Fact]        public void Buffer_Boundaries_OnErrorBoundaries()        {            var ex = new Exception();            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(90, 1),                OnNext(180, 2),                OnNext(250, 3),                OnNext(260, 4),                OnNext(310, 5),                OnNext(340, 6),                OnNext(410, 7),                OnNext(420, 8),                OnNext(470, 9),                OnNext(550, 10),                OnCompleted<int>(590)            );            var ys = scheduler.CreateHotObservable(                OnNext(255, true),                OnNext(330, true),                OnNext(350, true),                OnError<bool>(400, ex)            );            var res = scheduler.Start(() =>                xs.Buffer(ys)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(255, b => b.SequenceEqual(new int[] { 3 })),                OnNext<IList<int>>(330, b => b.SequenceEqual(new int[] { 4, 5 })),                OnNext<IList<int>>(350, b => b.SequenceEqual(new int[] { 6 })),                OnError<IList<int>>(400, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 400)            );            ys.Subscriptions.AssertEqual(                Subscribe(200, 400)            );        }        #endregion        #region + Count +        [Fact]        public void Buffer_Single_ArgumentChecking()        {            var someObservable = Observable.Empty<int>();            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, -1));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 1, 0));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(someObservable, 0, 1));        }        [Fact]        public void Buffer_Count_PartialWindow()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(150, 1),                OnNext(210, 2),                OnNext(220, 3),                OnNext(230, 4),                OnNext(240, 5),                OnCompleted<int>(250)            );            var res = scheduler.Start(() =>                xs.Buffer(5)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(250, l => l.SequenceEqual(new[] { 2, 3, 4, 5 })),                OnCompleted<IList<int>>(250)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 250)            );        }        [Fact]        public void Buffer_Count_FullWindows()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(150, 1),                OnNext(210, 2),                OnNext(220, 3),                OnNext(230, 4),                OnNext(240, 5),                OnCompleted<int>(250)            );            var res = scheduler.Start(() =>                xs.Buffer(2)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(220, l => l.SequenceEqual(new[] { 2, 3 })),                OnNext<IList<int>>(240, l => l.SequenceEqual(new[] { 4, 5 })),                OnCompleted<IList<int>>(250)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 250)            );        }        [Fact]        public void Buffer_Count_FullAndPartialWindows()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(150, 1),                OnNext(210, 2),                OnNext(220, 3),                OnNext(230, 4),                OnNext(240, 5),                OnCompleted<int>(250)            );            var res = scheduler.Start(() =>                xs.Buffer(3)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),                OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),                OnCompleted<IList<int>>(250)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 250)            );        }        [Fact]        public void Buffer_Count_Error()        {            var scheduler = new TestScheduler();            var ex = new Exception();            var xs = scheduler.CreateHotObservable(                OnNext(150, 1),                OnNext(210, 2),                OnNext(220, 3),                OnNext(230, 4),                OnNext(240, 5),                OnError<int>(250, ex)            );            var res = scheduler.Start(() =>                xs.Buffer(5)            );            res.Messages.AssertEqual(                OnError<IList<int>>(250, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 250)            );        }        [Fact]        public void Buffer_Count_Skip_Less()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(150, 1),                OnNext(210, 2),                OnNext(220, 3),                OnNext(230, 4),                OnNext(240, 5),                OnCompleted<int>(250)            );            var res = scheduler.Start(() =>                xs.Buffer(3, 1)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(230, l => l.SequenceEqual(new int[] { 2, 3, 4 })),                OnNext<IList<int>>(240, l => l.SequenceEqual(new int[] { 3, 4, 5 })),                OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 4, 5 })),                OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),                OnCompleted<IList<int>>(250)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 250)            );        }        [Fact]        public void Buffer_Count_Skip_More()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(150, 1),                OnNext(210, 2),                OnNext(220, 3),                OnNext(230, 4),                OnNext(240, 5),                OnCompleted<int>(250)            );            var res = scheduler.Start(() =>                xs.Buffer(2, 3)            );            res.Messages.AssertEqual(                OnNext<IList<int>>(220, l => l.SequenceEqual(new int[] { 2, 3 })),                OnNext<IList<int>>(250, l => l.SequenceEqual(new int[] { 5 })),                OnCompleted<IList<int>>(250)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 250)            );        }        [Fact]        public void BufferWithCount_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1, 1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0, 1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 1, 0));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), 1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, 0));        }        [Fact]        public void BufferWithCount_Basic()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(280, "2,3,4"),                OnNext(350, "4,5,6"),                OnNext(420, "6,7,8"),                OnNext(600, "8,9"),                OnCompleted<string>(600)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithCount_Disposed()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())), 370            );            res.Messages.AssertEqual(                OnNext(280, "2,3,4"),                OnNext(350, "4,5,6")            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 370)            );        }        [Fact]        public void BufferWithCount_Error()        {            var scheduler = new TestScheduler();            var ex = new Exception();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnError<int>(600, ex)            );            var res = scheduler.Start(() =>                xs.Buffer(3, 2).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(280, "2,3,4"),                OnNext(350, "4,5,6"),                OnNext(420, "6,7,8"),                OnError<string>(600, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithCount_Default()        {            Observable.Range(1, 10).Buffer(3).Skip(1).First().AssertEqual(4, 5, 6);            Observable.Range(1, 10).Buffer(3, 2).Skip(1).First().AssertEqual(3, 4, 5);        }        #endregion        #region + Time +        [Fact]        public void Buffer_Time_ArgumentChecking()        {            var scheduler = new TestScheduler();            var someObservable = Observable.Empty<int>();            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, null));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, scheduler));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(someObservable, TimeSpan.Zero, TimeSpan.Zero, null));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.Zero, TimeSpan.Zero, scheduler));        }        [Fact]        public void BufferWithTime_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1), DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1), DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(1), null));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), TimeSpan.FromTicks(1)));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), TimeSpan.FromTicks(1)));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), TimeSpan.FromTicks(-1)));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), null));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1)));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1)));        }        [Fact]        public void BufferWithTime_Basic1()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(300, "2,3,4"),                OnNext(370, "4,5,6"),                OnNext(440, "6,7,8"),                OnNext(510, "8,9"),                OnNext(580, ""),                OnNext(600, ""),                OnCompleted<string>(600)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithTime_Basic2()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(70), TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(270, "2,3"),                OnNext(370, "5,6"),                OnNext(470, "8,9"),                OnNext(570, ""),                OnCompleted<string>(600)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithTime_Error()        {            var scheduler = new TestScheduler();            var ex = new Exception();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnError<int>(600, ex)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(300, "2,3,4"),                OnNext(370, "4,5,6"),                OnNext(440, "6,7,8"),                OnNext(510, "8,9"),                OnNext(580, ""),                OnError<string>(600, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithTime_Disposed()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(100), TimeSpan.FromTicks(70), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),                370            );            res.Messages.AssertEqual(                OnNext(300, "2,3,4")            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 370)            );        }        [Fact]        public void BufferWithTime_Basic_Same()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(300, "2,3,4"),                OnNext(400, "5,6,7"),                OnNext(500, "8,9"),                OnNext(600, ""),                OnCompleted<string>(600)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithTime_Basic_Same_Periodic()        {            var scheduler = new PeriodicTestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(300, "2,3,4"),                OnNext(400, "5,6,7"),                OnNext(500, "8,9"),                OnNext(600, ""),                OnCompleted<string>(600)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );#if !WINDOWS            scheduler.Timers.AssertEqual(                new TimerRun(200, 600) { 300, 400, 500 }            );#endif        }        [Fact]        public void BufferWithTime_Basic_Same_Periodic_Error()        {            var ex = new Exception();            var scheduler = new PeriodicTestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(100, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(380, 7),                OnNext(420, 8),                OnNext(470, 9),                OnError<int>(480, ex)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(100), scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(300, "2,3,4"),                OnNext(400, "5,6,7"),                OnError<string>(480, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 480)            );#if !WINDOWS            scheduler.Timers.AssertEqual(                new TimerRun(200, 480) { 300, 400 }            );#endif        }        [Fact]        public void BufferWithTime_Default()        {            Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1), TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));            Observable.Range(0, 10).Buffer(TimeSpan.FromDays(1)).First().AssertEqual(Enumerable.Range(0, 10));        }        [Fact]        public void BufferWithTimeOrCount_ArgumentChecking()        {            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1, DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1, DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0, DummyScheduler.Instance));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 1, null));            ReactiveAssert.Throws<ArgumentNullException>(() => Observable.Buffer(default(IObservable<int>), TimeSpan.FromTicks(1), 1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(-1), 1));            ReactiveAssert.Throws<ArgumentOutOfRangeException>(() => Observable.Buffer(DummyObservable<int>.Instance, TimeSpan.FromTicks(1), 0));        }        [Fact]        public void BufferWithTimeOrCount_Basic()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(205, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(370, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(240, "1,2,3"),                OnNext(310, "4"),                OnNext(370, "5,6,7"),                OnNext(440, "8"),                OnNext(510, "9"),                OnNext(580, ""),                OnNext(600, ""),                OnCompleted<string>(600)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithTimeOrCount_Error()        {            var scheduler = new TestScheduler();            var ex = new Exception();            var xs = scheduler.CreateHotObservable(                OnNext(205, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(370, 7),                OnNext(420, 8),                OnNext(470, 9),                OnError<int>(600, ex)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray()))            );            res.Messages.AssertEqual(                OnNext(240, "1,2,3"),                OnNext(310, "4"),                OnNext(370, "5,6,7"),                OnNext(440, "8"),                OnNext(510, "9"),                OnNext(580, ""),                OnError<string>(600, ex)            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 600)            );        }        [Fact]        public void BufferWithTimeOrCount_Disposed()        {            var scheduler = new TestScheduler();            var xs = scheduler.CreateHotObservable(                OnNext(205, 1),                OnNext(210, 2),                OnNext(240, 3),                OnNext(280, 4),                OnNext(320, 5),                OnNext(350, 6),                OnNext(370, 7),                OnNext(420, 8),                OnNext(470, 9),                OnCompleted<int>(600)            );            var res = scheduler.Start(() =>                xs.Buffer(TimeSpan.FromTicks(70), 3, scheduler).Select(x => string.Join(",", x.Select(xx => xx.ToString()).ToArray())),                370            );            res.Messages.AssertEqual(                OnNext(240, "1,2,3"),                OnNext(310, "4"),                OnNext(370, "5,6,7")            );            xs.Subscriptions.AssertEqual(                Subscribe(200, 370)            );        }        [Fact]        public void BufferWithTimeOrCount_Default()        {            Observable.Range(1, 10, DefaultScheduler.Instance).Buffer(TimeSpan.FromDays(1), 3).Skip(1).First().AssertEqual(4, 5, 6);        }        [Fact]        public void BufferWithTime_TickWhileOnCompleted()        {            var scheduler = new TestScheduler();            Observable.Return(1)                .Buffer(TimeSpan.FromMilliseconds(1), TimeSpan.FromMilliseconds(2), scheduler)                .Subscribe(v =>                {                    scheduler.AdvanceBy(TimeSpan.FromMilliseconds(1).Ticks);                });        }        #endregion    }}
 |