// Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information. using System; using System.Collections.Generic; using System.Linq; using System.Reactive.Concurrency; using System.Reactive.Linq; using System.Threading; using Microsoft.Reactive.Testing; using ReactiveTests.Dummies; using Xunit; namespace ReactiveTests.Tests { public class TakeLastBufferTest : ReactiveTest { #region + Count + [Fact] public void TakeLastBuffer_ArgumentChecking() { ReactiveAssert.Throws(() => Observable.TakeLastBuffer(null, 0)); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(DummyObservable.Instance, -1)); } [Fact] public void TakeLastBuffer_Zero_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLastBuffer(0) ); res.Messages.AssertEqual( OnNext>(650, lst => lst.Count == 0), OnCompleted>(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [Fact] public void TakeLastBuffer_Zero_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(0) ); res.Messages.AssertEqual( OnError>(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [Fact] public void TakeLastBuffer_Zero_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLastBuffer(0) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [Fact] public void TakeLastBuffer_One_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLastBuffer(1) ); res.Messages.AssertEqual( OnNext>(650, lst => lst.SequenceEqual(new[] { 9 })), OnCompleted>(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [Fact] public void TakeLastBuffer_One_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(1) ); res.Messages.AssertEqual( OnError>(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [Fact] public void TakeLastBuffer_One_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLastBuffer(1) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [Fact] public void TakeLastBuffer_Three_Completed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnCompleted(650) ); var res = scheduler.Start(() => xs.TakeLastBuffer(3) ); res.Messages.AssertEqual( OnNext>(650, lst => lst.SequenceEqual(new[] { 7, 8, 9 })), OnCompleted>(650) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [Fact] public void TakeLastBuffer_Three_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9), OnError(650, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(3) ); res.Messages.AssertEqual( OnError>(650, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 650) ); } [Fact] public void TakeLastBuffer_Three_Disposed() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(180, 1), OnNext(210, 2), OnNext(250, 3), OnNext(270, 4), OnNext(310, 5), OnNext(360, 6), OnNext(380, 7), OnNext(410, 8), OnNext(590, 9) ); var res = scheduler.Start(() => xs.TakeLastBuffer(3) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } #endregion #region + Timed + [Fact] public void TakeLastBuffer_Timed_ArgumentChecking() { var xs = Observable.Return(42); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(default(IObservable), TimeSpan.FromSeconds(1))); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1))); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(default(IObservable), TimeSpan.FromSeconds(1), Scheduler.Default)); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(1), default)); ReactiveAssert.Throws(() => Observable.TakeLastBuffer(xs, TimeSpan.FromSeconds(-1), Scheduler.Default)); } [Fact] public void TakeLastBuffer_Zero1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnNext>(230, lst => lst.Count == 0), OnCompleted>(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [Fact] public void TakeLastBuffer_Zero2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.Zero, scheduler) ); res.Messages.AssertEqual( OnNext>(230, lst => lst.Count == 0), OnCompleted>(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [Fact] public void TakeLastBuffer_Some1() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(240) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext>(240, lst => lst.SequenceEqual(new[] { 2, 3 })), OnCompleted>(240) ); xs.Subscriptions.AssertEqual( Subscribe(200, 240) ); } [Fact] public void TakeLastBuffer_Some2() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext>(300, lst => lst.Count == 0), OnCompleted>(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [Fact] public void TakeLastBuffer_Some3() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnNext(230, 3), OnNext(240, 4), OnNext(250, 5), OnNext(260, 6), OnNext(270, 7), OnNext(280, 8), OnNext(290, 9), OnCompleted(300) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(45), scheduler) ); res.Messages.AssertEqual( OnNext>(300, lst => lst.SequenceEqual(new[] { 6, 7, 8, 9 })), OnCompleted>(300) ); xs.Subscriptions.AssertEqual( Subscribe(200, 300) ); } [Fact] public void TakeLastBuffer_Some4() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(240, 2), OnNext(250, 3), OnNext(280, 4), OnNext(290, 5), OnNext(300, 6), OnCompleted(350) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(25), scheduler) ); res.Messages.AssertEqual( OnNext>(350, lst => lst.Count == 0), OnCompleted>(350) ); xs.Subscriptions.AssertEqual( Subscribe(200, 350) ); } [Fact] public void TakeLastBuffer_All() { var scheduler = new TestScheduler(); var xs = scheduler.CreateHotObservable( OnNext(210, 1), OnNext(220, 2), OnCompleted(230) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnNext>(230, lst => lst.SequenceEqual(new[] { 1, 2 })), OnCompleted>(230) ); xs.Subscriptions.AssertEqual( Subscribe(200, 230) ); } [Fact] public void TakeLastBuffer_Error() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( OnError(210, ex) ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( OnError>(210, ex) ); xs.Subscriptions.AssertEqual( Subscribe(200, 210) ); } [Fact] public void TakeLastBuffer_Never() { var scheduler = new TestScheduler(); var ex = new Exception(); var xs = scheduler.CreateHotObservable( ); var res = scheduler.Start(() => xs.TakeLastBuffer(TimeSpan.FromTicks(50), scheduler) ); res.Messages.AssertEqual( ); xs.Subscriptions.AssertEqual( Subscribe(200, 1000) ); } [Fact] public void TakeLastBuffer_Default1() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60)).SingleAsync(); var e = new ManualResetEvent(false); var lst = default(IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10))); } [Fact] public void TakeLastBuffer_Default2() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.FromSeconds(60), Scheduler.Default.DisableOptimizations()).SingleAsync(); var e = new ManualResetEvent(false); var lst = default(IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.True(lst.SequenceEqual(Enumerable.Range(0, 10))); } [Fact] public void TakeLastBuffer_Default3() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.Zero).SingleAsync(); var e = new ManualResetEvent(false); var lst = default(IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.True(lst.Count == 0); } [Fact] public void TakeLastBuffer_Default4() { var xs = Observable.Range(0, 10, Scheduler.Default); var res = xs.TakeLastBuffer(TimeSpan.Zero, Scheduler.Default.DisableOptimizations()).SingleAsync(); var e = new ManualResetEvent(false); var lst = default(IList); res.Subscribe( x => lst = x, () => e.Set() ); e.WaitOne(); Assert.True(lst.Count == 0); } #endregion } }