// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using Microsoft.Reactive.Testing; using Microsoft.VisualStudio.TestTools.UnitTesting; using Assert = Xunit.Assert; namespace ReactiveTests.Tests { [TestClass] public class ChunkifyTest : ReactiveTest { [TestMethod] public void Chunkify_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.Chunkify(default(IObservable))); } [TestMethod] public void Chunkify_Regular1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5), OnNext(600, 6), OnNext(700, 7), OnNext(800, 8), OnCompleted(900) ); var ys = xs.Chunkify(); var e = default(IEnumerator>); var res = new List>(); var log = new Action(() => { Assert.True(e.MoveNext()); res.Add(e.Current); }); scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); }); scheduler.ScheduleAbsolute(270, log); scheduler.ScheduleAbsolute(310, log); scheduler.ScheduleAbsolute(450, log); scheduler.ScheduleAbsolute(470, log); scheduler.ScheduleAbsolute(750, log); scheduler.ScheduleAbsolute(850, log); scheduler.ScheduleAbsolute(950, log); scheduler.ScheduleAbsolute(980, () => Assert.False(e.MoveNext())); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(250, 900) ); Assert.Equal(7, res.Count); Assert.True(res[0].SequenceEqual([])); Assert.True(res[1].SequenceEqual([3])); Assert.True(res[2].SequenceEqual([4])); Assert.True(res[3].SequenceEqual([])); Assert.True(res[4].SequenceEqual([5, 6, 7])); Assert.True(res[5].SequenceEqual([8])); Assert.True(res[6].SequenceEqual([])); } [TestMethod] public void Chunkify_Regular2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5), OnNext(600, 6), OnNext(700, 7), OnNext(800, 8), OnCompleted(900) ); var ys = xs.Chunkify(); var e = default(IEnumerator>); var res = new List>(); var log = new Action(() => { Assert.True(e.MoveNext()); res.Add(e.Current); }); scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); }); scheduler.ScheduleAbsolute(550, log); scheduler.ScheduleAbsolute(950, log); scheduler.ScheduleAbsolute(980, () => Assert.False(e.MoveNext())); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(250, 900) ); Assert.Equal(2, res.Count); Assert.True(res[0].SequenceEqual([3, 4, 5])); Assert.True(res[1].SequenceEqual([6, 7, 8])); } [TestMethod] public void Chunkify_Error() { var ex = new Exception(); var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5), OnNext(600, 6), OnError(700, ex) ); var ys = xs.Chunkify(); var e = default(IEnumerator>); var res = new List>(); var log = new Action(() => { Assert.True(e.MoveNext()); res.Add(e.Current); }); scheduler.ScheduleAbsolute(250, () => { e = ys.GetEnumerator(); }); scheduler.ScheduleAbsolute(270, log); scheduler.ScheduleAbsolute(310, log); scheduler.ScheduleAbsolute(450, log); scheduler.ScheduleAbsolute(470, log); scheduler.ScheduleAbsolute(750, () => { try { e.MoveNext(); Assert.True(false); } catch (Exception error) { Assert.Same(ex, error); } }); scheduler.Start(); xs.Subscriptions.AssertEqual( Subscribe(250, 700) ); Assert.Equal(4, res.Count); Assert.True(res[0].SequenceEqual([])); Assert.True(res[1].SequenceEqual([3])); Assert.True(res[2].SequenceEqual([4])); Assert.True(res[3].SequenceEqual([])); } } }